From f67c2d1bd0c8abc3d4ea76deffe45fdd92ef5e05 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Fri, 23 Mar 2012 20:46:18 +0000 Subject: [PATCH] MAPREDUCE-4043. Secret keys set in Credentials are not seen by tasks (Jason Lowe via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1304587 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/MapTaskAttemptImpl.java | 8 +- .../hadoop/mapred/ReduceTaskAttemptImpl.java | 8 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 4 +- .../v2/app/job/impl/MapTaskImpl.java | 9 +- .../v2/app/job/impl/ReduceTaskImpl.java | 9 +- .../v2/app/job/impl/TaskAttemptImpl.java | 35 +++--- .../mapreduce/v2/app/job/impl/TaskImpl.java | 9 +- .../v2/app/job/impl/TestTaskAttempt.java | 101 +++++++++++++++++- .../v2/app/job/impl/TestTaskImpl.java | 17 +-- .../src/test/resources/krb5.conf | 28 +++++ 11 files changed, 175 insertions(+), 56 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/resources/krb5.conf diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9686d887692..a6b73d052ab 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -210,6 +210,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-4034. Unable to view task logs on history server with mapreduce.job.acl-view-job=* (Jason Lowe and Siddarth Seth via bobby) + MAPREDUCE-4043. Secret keys set in Credentials are not seen by tasks + (Jason Lowe via bobby) + OPTIMIZATIONS MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java index 4d0d17f0dbc..e60dcda47cb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.mapred; -import java.util.Collection; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -30,8 +28,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; @@ -45,11 +43,11 @@ public class MapTaskAttemptImpl extends TaskAttemptImpl { int partition, TaskSplitMetaInfo splitInfo, JobConf conf, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, AppContext appContext) { super(taskId, attempt, eventHandler, taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(), - committer, jobToken, fsTokens, clock, appContext); + committer, jobToken, credentials, clock, appContext); this.splitInfo = splitInfo; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java index 63169817ff7..3249764ba72 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.mapred; -import java.util.Collection; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -29,8 +27,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; @@ -44,10 +42,10 @@ public class ReduceTaskAttemptImpl extends TaskAttemptImpl { int numMapTasks, JobConf conf, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, AppContext appContext) { super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition, - conf, new String[] {}, committer, jobToken, fsTokens, clock, + conf, new String[] {}, committer, jobToken, credentials, clock, appContext); this.numMapTasks = numMapTasks; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 5e504ab99dd..0fca3dd6859 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1066,7 +1066,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.remoteJobConfFile, job.conf, splits[i], job.taskAttemptListener, - job.committer, job.jobToken, job.fsTokens.getAllTokens(), + job.committer, job.jobToken, job.fsTokens, job.clock, job.completedTasksFromPreviousRun, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); @@ -1084,7 +1084,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.remoteJobConfFile, job.conf, job.numMapTasks, job.taskAttemptListener, job.committer, job.jobToken, - job.fsTokens.getAllTokens(), job.clock, + job.fsTokens, job.clock, job.completedTasksFromPreviousRun, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java index fd8f44738c1..db23c0a7c04 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; -import java.util.Collection; import java.util.Map; import org.apache.hadoop.fs.Path; @@ -35,8 +34,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; @@ -50,11 +49,11 @@ public class MapTaskImpl extends TaskImpl { TaskSplitMetaInfo taskSplitMetaInfo, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, - conf, taskAttemptListener, committer, jobToken, fsTokens, clock, + conf, taskAttemptListener, committer, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext); this.taskSplitMetaInfo = taskSplitMetaInfo; } @@ -69,7 +68,7 @@ public class MapTaskImpl extends TaskImpl { return new MapTaskAttemptImpl(getID(), nextAttemptNumber, eventHandler, jobFile, partition, taskSplitMetaInfo, conf, taskAttemptListener, - committer, jobToken, fsTokens, clock, appContext); + committer, jobToken, credentials, clock, appContext); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java index 38e8f8c17cb..13aeea071dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; -import java.util.Collection; import java.util.Map; import org.apache.hadoop.fs.Path; @@ -34,8 +33,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; @@ -48,11 +47,11 @@ public class ReduceTaskImpl extends TaskImpl { EventHandler eventHandler, Path jobFile, JobConf conf, int numMapTasks, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, - taskAttemptListener, committer, jobToken, fsTokens, clock, + taskAttemptListener, committer, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext); this.numMapTasks = numMapTasks; } @@ -67,7 +66,7 @@ public class ReduceTaskImpl extends TaskImpl { return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber, eventHandler, jobFile, partition, numMapTasks, conf, taskAttemptListener, - committer, jobToken, fsTokens, clock, appContext); + committer, jobToken, credentials, clock, appContext); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index dcb8972ade6..33dbb13523a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -24,7 +24,6 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -102,7 +101,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; @@ -157,7 +155,7 @@ public abstract class TaskAttemptImpl implements private final Lock readLock; private final Lock writeLock; private final AppContext appContext; - private Collection> fsTokens; + private Credentials credentials; private Token jobToken; private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); private static String initialClasspath = null; @@ -458,7 +456,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptListener taskAttemptListener, Path jobFile, int partition, JobConf conf, String[] dataLocalHosts, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, AppContext appContext) { oldJobId = TypeConverter.fromYarn(taskId.getJobId()); this.conf = conf; @@ -477,7 +475,7 @@ public abstract class TaskAttemptImpl implements readLock = readWriteLock.readLock(); writeLock = readWriteLock.writeLock(); - this.fsTokens = fsTokens; + this.credentials = credentials; this.jobToken = jobToken; this.eventHandler = eventHandler; this.committer = committer; @@ -554,7 +552,7 @@ public abstract class TaskAttemptImpl implements Map applicationACLs, Configuration conf, Token jobToken, final org.apache.hadoop.mapred.JobID oldJobId, - Collection> fsTokens) { + Credentials credentials) { // Application resources Map localResources = @@ -567,7 +565,7 @@ public abstract class TaskAttemptImpl implements Map serviceData = new HashMap(); // Tokens - ByteBuffer tokens = ByteBuffer.wrap(new byte[]{}); + ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{}); try { FileSystem remoteFS = FileSystem.get(conf); @@ -609,16 +607,14 @@ public abstract class TaskAttemptImpl implements // Setup DistributedCache MRApps.setupDistributedCache(conf, localResources); - // Setup up tokens + // Setup up task credentials buffer Credentials taskCredentials = new Credentials(); if (UserGroupInformation.isSecurityEnabled()) { - // Add file-system tokens - for (Token token : fsTokens) { - LOG.info("Putting fs-token for NM use for launching container : " - + token.toString()); - taskCredentials.addToken(token.getService(), token); - } + LOG.info("Adding #" + credentials.numberOfTokens() + + " tokens and #" + credentials.numberOfSecretKeys() + + " secret keys for NM use for launching container"); + taskCredentials.addAll(credentials); } // LocalStorageToken is needed irrespective of whether security is enabled @@ -629,7 +625,7 @@ public abstract class TaskAttemptImpl implements LOG.info("Size of containertokens_dob is " + taskCredentials.numberOfTokens()); taskCredentials.writeTokenStorageToStream(containerTokens_dob); - tokens = + taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); @@ -674,7 +670,8 @@ public abstract class TaskAttemptImpl implements ContainerLaunchContext container = BuilderUtils .newContainerLaunchContext(null, conf .get(MRJobConfig.USER_NAME), null, localResources, - environment, null, serviceData, tokens, applicationACLs); + environment, null, serviceData, taskCredentialsBuffer, + applicationACLs); return container; } @@ -686,12 +683,12 @@ public abstract class TaskAttemptImpl implements final org.apache.hadoop.mapred.JobID oldJobId, Resource assignedCapability, WrappedJvmID jvmID, TaskAttemptListener taskAttemptListener, - Collection> fsTokens) { + Credentials credentials) { synchronized (commonContainerSpecLock) { if (commonContainerSpec == null) { commonContainerSpec = createCommonContainerLaunchContext( - applicationACLs, conf, jobToken, oldJobId, fsTokens); + applicationACLs, conf, jobToken, oldJobId, credentials); } } @@ -1162,7 +1159,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.assignedCapability, taskAttempt.jvmID, taskAttempt.taskAttemptListener, - taskAttempt.fsTokens); + taskAttempt.credentials); taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent( taskAttempt.attemptId, taskAttempt.containerID, taskAttempt.containerMgrAddress, taskAttempt.containerToken, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 4447c1d2932..174e9d1f443 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; @@ -72,8 +71,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -110,7 +109,7 @@ public abstract class TaskImpl implements Task, EventHandler { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - protected Collection> fsTokens; + protected Credentials credentials; protected Token jobToken; // counts the number of attempts that are either running or in a state where @@ -251,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler { EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics, AppContext appContext) { this.conf = conf; @@ -270,7 +269,7 @@ public abstract class TaskImpl implements Task, EventHandler { this.taskAttemptListener = taskAttemptListener; this.eventHandler = eventHandler; this.committer = committer; - this.fsTokens = fsTokens; + this.credentials = credentials; this.jobToken = jobToken; this.metrics = metrics; this.appContext = appContext; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index e746726a47a..1b54a1ae667 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -25,6 +25,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -32,14 +35,23 @@ import java.util.Map; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -61,21 +73,106 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; import org.mockito.ArgumentCaptor; -import com.sun.source.tree.AssertTree; - @SuppressWarnings("unchecked") public class TestTaskAttempt{ + @SuppressWarnings("rawtypes") + @Test + public void testAttemptContainerRequest() throws Exception { + final Text SECRET_KEY_ALIAS = new Text("secretkeyalias"); + final byte[] SECRET_KEY = ("secretkey").getBytes(); + Map acls = + new HashMap(1); + acls.put(ApplicationAccessType.VIEW_APP, "otheruser"); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + Path jobFile = mock(Path.class); + + EventHandler eventHandler = mock(EventHandler.class); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + + // setup UGI for security so tokens and keys are preserved + jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(jobConf); + + Credentials credentials = new Credentials(); + credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY); + Token jobToken = new Token( + ("tokenid").getBytes(), ("tokenpw").getBytes(), + new Text("tokenkind"), new Text("tokenservice")); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + mock(TaskSplitMetaInfo.class), jobConf, taListener, + mock(OutputCommitter.class), jobToken, credentials, + new SystemClock(), null); + + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); + ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1); + + ContainerLaunchContext launchCtx = + TaskAttemptImpl.createContainerLaunchContext(acls, containerId, + jobConf, jobToken, taImpl.createRemoteTask(), + TypeConverter.fromYarn(jobId), mock(Resource.class), + mock(WrappedJvmID.class), taListener, + credentials); + + Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs()); + Credentials launchCredentials = new Credentials(); + + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(launchCtx.getContainerTokens()); + launchCredentials.readTokenStorageStream(dibb); + + // verify all tokens specified for the task attempt are in the launch context + for (Token token : credentials.getAllTokens()) { + Token launchToken = + launchCredentials.getToken(token.getService()); + Assert.assertNotNull("Token " + token.getService() + " is missing", + launchToken); + Assert.assertEquals("Token " + token.getService() + " mismatch", + token, launchToken); + } + + // verify the secret key is in the launch context + Assert.assertNotNull("Secret key missing", + launchCredentials.getSecretKey(SECRET_KEY_ALIAS)); + Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY, + launchCredentials.getSecretKey(SECRET_KEY_ALIAS))); + } + + static public class StubbedFS extends RawLocalFileSystem { + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(1, false, 1, 1, 1, f); + } + } + @Test public void testMRAppHistoryForMap() throws Exception { MRApp app = new FailingAttemptsMRApp(1, 0); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index a42fda16677..4ff6001bf58 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; @@ -74,7 +75,7 @@ public class TestTaskImpl { private Token jobToken; private JobId jobId; private Path remoteJobConfFile; - private Collection> fsTokens; + private Credentials credentials; private Clock clock; private Map completedTasksFromPreviousRun; private MRAppMetrics metrics; @@ -100,12 +101,12 @@ public class TestTaskImpl { EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics, AppContext appContext) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, - jobToken, fsTokens, clock, + jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext); } @@ -118,7 +119,7 @@ public class TestTaskImpl { protected TaskAttemptImpl createAttempt() { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, eventHandler, taskAttemptListener, remoteJobConfFile, partition, - conf, committer, jobToken, fsTokens, clock, appContext); + conf, committer, jobToken, credentials, clock, appContext); taskAttempts.add(attempt); return attempt; } @@ -140,10 +141,10 @@ public class TestTaskImpl { TaskAttemptListener taskAttemptListener, Path jobFile, int partition, JobConf conf, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock, + Credentials credentials, Clock clock, AppContext appContext) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, - dataLocations, committer, jobToken, fsTokens, clock, appContext); + dataLocations, committer, jobToken, credentials, clock, appContext); attemptId = Records.newRecord(TaskAttemptId.class); attemptId.setId(id); attemptId.setTaskId(taskId); @@ -203,7 +204,7 @@ public class TestTaskImpl { committer = mock(OutputCommitter.class); jobToken = (Token) mock(Token.class); remoteJobConfFile = mock(Path.class); - fsTokens = null; + credentials = null; clock = new SystemClock(); metrics = mock(MRAppMetrics.class); dataLocations = new String[1]; @@ -224,7 +225,7 @@ public class TestTaskImpl { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, - fsTokens, clock, + credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/resources/krb5.conf b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/resources/krb5.conf new file mode 100644 index 00000000000..121ac6d9b98 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/resources/krb5.conf @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[libdefaults] + default_realm = APACHE.ORG + udp_preference_limit = 1 + extra_addresses = 127.0.0.1 +[realms] + APACHE.ORG = { + admin_server = localhost:88 + kdc = localhost:88 + } +[domain_realm] + localhost = APACHE.ORG