diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5842718643d..776d6be8f5f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -802,6 +802,10 @@ Release 2.6.0 - UNRELEASED YARN-2711. Fixed TestDefaultContainerExecutor#testContainerLaunchError failure on Windows. (Varun Vasudev via zjshen) + YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail + beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704). + (Jian He via vinodkv) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 22057f45674..4f90bf53e6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -433,7 +433,7 @@ public class NodeManager extends CompositeService return systemCredentials; } - public void setSystemCrendentials( + public void setSystemCrendentialsForApps( Map systemCredentials) { this.systemCredentials = systemCredentials; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 1c3ac5cff3c..ebbe503bcd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -626,7 +626,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements response.getSystemCredentialsForApps(); if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context) - .setSystemCrendentials(parseCredentials(systemCredentials)); + .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 08ed3a1fb32..cb56d67a884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1122,9 +1122,9 @@ public class ResourceLocalizationService extends CompositeService if (systemCredentials == null) { return null; } - LOG.info("Adding new framework tokens from RM for " + appId); for (Token token : systemCredentials.getAllTokens()) { - LOG.info("Adding new application-token for localization: " + token); + LOG.info("Adding new framework-token for " + appId + + " for localization: " + token); } return systemCredentials; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 43cd7b5012d..3174c5cfeeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -43,19 +42,21 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -197,6 +198,19 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return; } + if (UserGroupInformation.isSecurityEnabled()) { + Credentials systemCredentials = + context.getSystemCredentialsForApps().get(appId); + if (systemCredentials != null) { + for (Token token : systemCredentials.getAllTokens()) { + LOG.info("Adding new framework-token for " + appId + + " for log-aggregation: " + token + " user=" + userUgi); + } + // this will replace old token + userUgi.addCredentials(systemCredentials); + } + } + // Create a set of Containers whose logs will be uploaded in this cycle. // It includes: // a) all containers in pendingContainers: those containers are finished @@ -538,4 +552,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return logValue.getCurrentUpLoadedFilesPath(); } } + + // only for test + @VisibleForTesting + public UserGroupInformation getUgi() { + return this.userUgi; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index cc717d7a06b..bd3e847579d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -344,18 +343,6 @@ public class LogAggregationService extends AbstractService implements Map appAcls, LogAggregationContext logAggregationContext) { - if (UserGroupInformation.isSecurityEnabled()) { - Credentials systemCredentials = - context.getSystemCredentialsForApps().get(appId); - if (systemCredentials != null) { - LOG.info("Adding new framework tokens from RM for " + appId); - for (Token token : systemCredentials.getAllTokens()) { - LOG.info("Adding new application-token for log-aggregation: " + token); - } - credentials = systemCredentials; - } - } - // Get user's FileSystem credentials final UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index cea71fa4a64..419de88a601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -55,11 +55,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.junit.Assert; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -67,8 +67,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -93,10 +96,11 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -107,19 +111,22 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mortbay.util.MultiException; +import com.google.common.base.Supplier; + //@Ignore public class TestLogAggregationService extends BaseContainerManagerTest { @@ -152,6 +159,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { dispatcher = createDispatcher(); appEventHandler = mock(EventHandler.class); dispatcher.register(ApplicationEventType.class, appEventHandler); + UserGroupInformation.setConfiguration(conf); } @Override @@ -1424,6 +1432,64 @@ public class TestLogAggregationService extends BaseContainerManagerTest { dispatcher.stop(); } + + @Test (timeout = 20000) + public void testAddNewTokenSentFromRMForLogAggregation() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + DrainDispatcher dispatcher = createDispatcher(); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + Application mockApp = mock(Application.class); + when(mockApp.getContainers()).thenReturn( + new HashMap()); + this.context.getApplications().put(application1, mockApp); + @SuppressWarnings("resource") + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application1, + this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + Records.newRecord(LogAggregationContext.class))); + + // Inject new token for log-aggregation after app log-aggregator init + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + this.context.getSystemCredentialsForApps().put(application1, credentials); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); + + final UserGroupInformation ugi = + ((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application1)).getUgi(); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + boolean hasNewToken = false; + for (Token token : ugi.getCredentials().getAllTokens()) { + if (token.equals(token1)) { + hasNewToken = true; + } + } + return hasNewToken; + } + }, 1000, 20000); + logAggregationService.stop(); + dispatcher.stop(); + } + private int numOfLogsAvailable(LogAggregationService logAggregationService, ApplicationId appId, boolean sizeLimited, String lastLogFile) throws IOException {