MAPREDUCE-4043. Secret keys set in Credentials are not seen by tasks (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1304587 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
081eda94fe
commit
f67c2d1bd0
|
@ -210,6 +210,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
MAPREDUCE-4034. Unable to view task logs on history server with
|
MAPREDUCE-4034. Unable to view task logs on history server with
|
||||||
mapreduce.job.acl-view-job=* (Jason Lowe and Siddarth Seth via bobby)
|
mapreduce.job.acl-view-job=* (Jason Lowe and Siddarth Seth via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4043. Secret keys set in Credentials are not seen by tasks
|
||||||
|
(Jason Lowe via bobby)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and
|
MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
|
@ -30,8 +28,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
|
||||||
|
@ -45,11 +43,11 @@ public class MapTaskAttemptImpl extends TaskAttemptImpl {
|
||||||
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
|
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
|
||||||
TaskAttemptListener taskAttemptListener,
|
TaskAttemptListener taskAttemptListener,
|
||||||
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
|
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
AppContext appContext) {
|
AppContext appContext) {
|
||||||
super(taskId, attempt, eventHandler,
|
super(taskId, attempt, eventHandler,
|
||||||
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
|
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
|
||||||
committer, jobToken, fsTokens, clock, appContext);
|
committer, jobToken, credentials, clock, appContext);
|
||||||
this.splitInfo = splitInfo;
|
this.splitInfo = splitInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
|
@ -29,8 +27,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
|
||||||
|
@ -44,10 +42,10 @@ public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
|
||||||
int numMapTasks, JobConf conf,
|
int numMapTasks, JobConf conf,
|
||||||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
AppContext appContext) {
|
AppContext appContext) {
|
||||||
super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
|
super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
|
||||||
conf, new String[] {}, committer, jobToken, fsTokens, clock,
|
conf, new String[] {}, committer, jobToken, credentials, clock,
|
||||||
appContext);
|
appContext);
|
||||||
this.numMapTasks = numMapTasks;
|
this.numMapTasks = numMapTasks;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1066,7 +1066,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.remoteJobConfFile,
|
job.remoteJobConfFile,
|
||||||
job.conf, splits[i],
|
job.conf, splits[i],
|
||||||
job.taskAttemptListener,
|
job.taskAttemptListener,
|
||||||
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
|
job.committer, job.jobToken, job.fsTokens,
|
||||||
job.clock, job.completedTasksFromPreviousRun,
|
job.clock, job.completedTasksFromPreviousRun,
|
||||||
job.applicationAttemptId.getAttemptId(),
|
job.applicationAttemptId.getAttemptId(),
|
||||||
job.metrics, job.appContext);
|
job.metrics, job.appContext);
|
||||||
|
@ -1084,7 +1084,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.remoteJobConfFile,
|
job.remoteJobConfFile,
|
||||||
job.conf, job.numMapTasks,
|
job.conf, job.numMapTasks,
|
||||||
job.taskAttemptListener, job.committer, job.jobToken,
|
job.taskAttemptListener, job.committer, job.jobToken,
|
||||||
job.fsTokens.getAllTokens(), job.clock,
|
job.fsTokens, job.clock,
|
||||||
job.completedTasksFromPreviousRun,
|
job.completedTasksFromPreviousRun,
|
||||||
job.applicationAttemptId.getAttemptId(),
|
job.applicationAttemptId.getAttemptId(),
|
||||||
job.metrics, job.appContext);
|
job.metrics, job.appContext);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -35,8 +34,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
|
||||||
|
@ -50,11 +49,11 @@ public class MapTaskImpl extends TaskImpl {
|
||||||
TaskSplitMetaInfo taskSplitMetaInfo,
|
TaskSplitMetaInfo taskSplitMetaInfo,
|
||||||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||||
MRAppMetrics metrics, AppContext appContext) {
|
MRAppMetrics metrics, AppContext appContext) {
|
||||||
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
|
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
|
||||||
conf, taskAttemptListener, committer, jobToken, fsTokens, clock,
|
conf, taskAttemptListener, committer, jobToken, credentials, clock,
|
||||||
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
||||||
this.taskSplitMetaInfo = taskSplitMetaInfo;
|
this.taskSplitMetaInfo = taskSplitMetaInfo;
|
||||||
}
|
}
|
||||||
|
@ -69,7 +68,7 @@ public class MapTaskImpl extends TaskImpl {
|
||||||
return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
|
return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
|
||||||
eventHandler, jobFile,
|
eventHandler, jobFile,
|
||||||
partition, taskSplitMetaInfo, conf, taskAttemptListener,
|
partition, taskSplitMetaInfo, conf, taskAttemptListener,
|
||||||
committer, jobToken, fsTokens, clock, appContext);
|
committer, jobToken, credentials, clock, appContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -34,8 +33,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
|
||||||
|
@ -48,11 +47,11 @@ public class ReduceTaskImpl extends TaskImpl {
|
||||||
EventHandler eventHandler, Path jobFile, JobConf conf,
|
EventHandler eventHandler, Path jobFile, JobConf conf,
|
||||||
int numMapTasks, TaskAttemptListener taskAttemptListener,
|
int numMapTasks, TaskAttemptListener taskAttemptListener,
|
||||||
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
|
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||||
MRAppMetrics metrics, AppContext appContext) {
|
MRAppMetrics metrics, AppContext appContext) {
|
||||||
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
|
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
|
||||||
taskAttemptListener, committer, jobToken, fsTokens, clock,
|
taskAttemptListener, committer, jobToken, credentials, clock,
|
||||||
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
||||||
this.numMapTasks = numMapTasks;
|
this.numMapTasks = numMapTasks;
|
||||||
}
|
}
|
||||||
|
@ -67,7 +66,7 @@ public class ReduceTaskImpl extends TaskImpl {
|
||||||
return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
|
return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
|
||||||
eventHandler, jobFile,
|
eventHandler, jobFile,
|
||||||
partition, numMapTasks, conf, taskAttemptListener,
|
partition, numMapTasks, conf, taskAttemptListener,
|
||||||
committer, jobToken, fsTokens, clock, appContext);
|
committer, jobToken, credentials, clock, appContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -102,7 +101,6 @@ import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
|
@ -157,7 +155,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
private final Lock readLock;
|
private final Lock readLock;
|
||||||
private final Lock writeLock;
|
private final Lock writeLock;
|
||||||
private final AppContext appContext;
|
private final AppContext appContext;
|
||||||
private Collection<Token<? extends TokenIdentifier>> fsTokens;
|
private Credentials credentials;
|
||||||
private Token<JobTokenIdentifier> jobToken;
|
private Token<JobTokenIdentifier> jobToken;
|
||||||
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
||||||
private static String initialClasspath = null;
|
private static String initialClasspath = null;
|
||||||
|
@ -458,7 +456,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
||||||
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
|
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
AppContext appContext) {
|
AppContext appContext) {
|
||||||
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
|
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -477,7 +475,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
readLock = readWriteLock.readLock();
|
readLock = readWriteLock.readLock();
|
||||||
writeLock = readWriteLock.writeLock();
|
writeLock = readWriteLock.writeLock();
|
||||||
|
|
||||||
this.fsTokens = fsTokens;
|
this.credentials = credentials;
|
||||||
this.jobToken = jobToken;
|
this.jobToken = jobToken;
|
||||||
this.eventHandler = eventHandler;
|
this.eventHandler = eventHandler;
|
||||||
this.committer = committer;
|
this.committer = committer;
|
||||||
|
@ -554,7 +552,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
|
Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
final org.apache.hadoop.mapred.JobID oldJobId,
|
final org.apache.hadoop.mapred.JobID oldJobId,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens) {
|
Credentials credentials) {
|
||||||
|
|
||||||
// Application resources
|
// Application resources
|
||||||
Map<String, LocalResource> localResources =
|
Map<String, LocalResource> localResources =
|
||||||
|
@ -567,7 +565,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
||||||
|
|
||||||
// Tokens
|
// Tokens
|
||||||
ByteBuffer tokens = ByteBuffer.wrap(new byte[]{});
|
ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
|
||||||
try {
|
try {
|
||||||
FileSystem remoteFS = FileSystem.get(conf);
|
FileSystem remoteFS = FileSystem.get(conf);
|
||||||
|
|
||||||
|
@ -609,16 +607,14 @@ public abstract class TaskAttemptImpl implements
|
||||||
// Setup DistributedCache
|
// Setup DistributedCache
|
||||||
MRApps.setupDistributedCache(conf, localResources);
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
|
||||||
// Setup up tokens
|
// Setup up task credentials buffer
|
||||||
Credentials taskCredentials = new Credentials();
|
Credentials taskCredentials = new Credentials();
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
// Add file-system tokens
|
LOG.info("Adding #" + credentials.numberOfTokens()
|
||||||
for (Token<? extends TokenIdentifier> token : fsTokens) {
|
+ " tokens and #" + credentials.numberOfSecretKeys()
|
||||||
LOG.info("Putting fs-token for NM use for launching container : "
|
+ " secret keys for NM use for launching container");
|
||||||
+ token.toString());
|
taskCredentials.addAll(credentials);
|
||||||
taskCredentials.addToken(token.getService(), token);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalStorageToken is needed irrespective of whether security is enabled
|
// LocalStorageToken is needed irrespective of whether security is enabled
|
||||||
|
@ -629,7 +625,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
LOG.info("Size of containertokens_dob is "
|
LOG.info("Size of containertokens_dob is "
|
||||||
+ taskCredentials.numberOfTokens());
|
+ taskCredentials.numberOfTokens());
|
||||||
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
|
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
|
||||||
tokens =
|
taskCredentialsBuffer =
|
||||||
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
|
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
|
||||||
containerTokens_dob.getLength());
|
containerTokens_dob.getLength());
|
||||||
|
|
||||||
|
@ -674,7 +670,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
ContainerLaunchContext container = BuilderUtils
|
ContainerLaunchContext container = BuilderUtils
|
||||||
.newContainerLaunchContext(null, conf
|
.newContainerLaunchContext(null, conf
|
||||||
.get(MRJobConfig.USER_NAME), null, localResources,
|
.get(MRJobConfig.USER_NAME), null, localResources,
|
||||||
environment, null, serviceData, tokens, applicationACLs);
|
environment, null, serviceData, taskCredentialsBuffer,
|
||||||
|
applicationACLs);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
@ -686,12 +683,12 @@ public abstract class TaskAttemptImpl implements
|
||||||
final org.apache.hadoop.mapred.JobID oldJobId,
|
final org.apache.hadoop.mapred.JobID oldJobId,
|
||||||
Resource assignedCapability, WrappedJvmID jvmID,
|
Resource assignedCapability, WrappedJvmID jvmID,
|
||||||
TaskAttemptListener taskAttemptListener,
|
TaskAttemptListener taskAttemptListener,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens) {
|
Credentials credentials) {
|
||||||
|
|
||||||
synchronized (commonContainerSpecLock) {
|
synchronized (commonContainerSpecLock) {
|
||||||
if (commonContainerSpec == null) {
|
if (commonContainerSpec == null) {
|
||||||
commonContainerSpec = createCommonContainerLaunchContext(
|
commonContainerSpec = createCommonContainerLaunchContext(
|
||||||
applicationACLs, conf, jobToken, oldJobId, fsTokens);
|
applicationACLs, conf, jobToken, oldJobId, credentials);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1162,7 +1159,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
|
taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
|
||||||
taskAttempt.oldJobId, taskAttempt.assignedCapability,
|
taskAttempt.oldJobId, taskAttempt.assignedCapability,
|
||||||
taskAttempt.jvmID, taskAttempt.taskAttemptListener,
|
taskAttempt.jvmID, taskAttempt.taskAttemptListener,
|
||||||
taskAttempt.fsTokens);
|
taskAttempt.credentials);
|
||||||
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
||||||
taskAttempt.attemptId, taskAttempt.containerID,
|
taskAttempt.attemptId, taskAttempt.containerID,
|
||||||
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -72,8 +71,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -110,7 +109,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
|
|
||||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
protected Collection<Token<? extends TokenIdentifier>> fsTokens;
|
protected Credentials credentials;
|
||||||
protected Token<JobTokenIdentifier> jobToken;
|
protected Token<JobTokenIdentifier> jobToken;
|
||||||
|
|
||||||
// counts the number of attempts that are either running or in a state where
|
// counts the number of attempts that are either running or in a state where
|
||||||
|
@ -251,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
|
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
|
||||||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||||
MRAppMetrics metrics, AppContext appContext) {
|
MRAppMetrics metrics, AppContext appContext) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -270,7 +269,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
this.taskAttemptListener = taskAttemptListener;
|
this.taskAttemptListener = taskAttemptListener;
|
||||||
this.eventHandler = eventHandler;
|
this.eventHandler = eventHandler;
|
||||||
this.committer = committer;
|
this.committer = committer;
|
||||||
this.fsTokens = fsTokens;
|
this.credentials = credentials;
|
||||||
this.jobToken = jobToken;
|
this.jobToken = jobToken;
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.appContext = appContext;
|
this.appContext = appContext;
|
||||||
|
|
|
@ -25,6 +25,9 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -32,14 +35,23 @@ import java.util.Map;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
||||||
|
import org.apache.hadoop.mapred.WrappedJvmID;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
|
||||||
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
@ -61,21 +73,106 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.ClusterInfo;
|
import org.apache.hadoop.yarn.ClusterInfo;
|
||||||
import org.apache.hadoop.yarn.SystemClock;
|
import org.apache.hadoop.yarn.SystemClock;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
import com.sun.source.tree.AssertTree;
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestTaskAttempt{
|
public class TestTaskAttempt{
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Test
|
||||||
|
public void testAttemptContainerRequest() throws Exception {
|
||||||
|
final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
|
||||||
|
final byte[] SECRET_KEY = ("secretkey").getBytes();
|
||||||
|
Map<ApplicationAccessType, String> acls =
|
||||||
|
new HashMap<ApplicationAccessType, String>(1);
|
||||||
|
acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||||
|
Path jobFile = mock(Path.class);
|
||||||
|
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||||
|
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
|
||||||
|
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||||
|
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||||
|
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||||
|
|
||||||
|
// setup UGI for security so tokens and keys are preserved
|
||||||
|
jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
|
UserGroupInformation.setConfiguration(jobConf);
|
||||||
|
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
|
||||||
|
Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
|
||||||
|
("tokenid").getBytes(), ("tokenpw").getBytes(),
|
||||||
|
new Text("tokenkind"), new Text("tokenservice"));
|
||||||
|
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
||||||
|
mock(TaskSplitMetaInfo.class), jobConf, taListener,
|
||||||
|
mock(OutputCommitter.class), jobToken, credentials,
|
||||||
|
new SystemClock(), null);
|
||||||
|
|
||||||
|
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
|
||||||
|
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
|
|
||||||
|
ContainerLaunchContext launchCtx =
|
||||||
|
TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
|
||||||
|
jobConf, jobToken, taImpl.createRemoteTask(),
|
||||||
|
TypeConverter.fromYarn(jobId), mock(Resource.class),
|
||||||
|
mock(WrappedJvmID.class), taListener,
|
||||||
|
credentials);
|
||||||
|
|
||||||
|
Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
|
||||||
|
Credentials launchCredentials = new Credentials();
|
||||||
|
|
||||||
|
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||||
|
dibb.reset(launchCtx.getContainerTokens());
|
||||||
|
launchCredentials.readTokenStorageStream(dibb);
|
||||||
|
|
||||||
|
// verify all tokens specified for the task attempt are in the launch context
|
||||||
|
for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
|
||||||
|
Token<? extends TokenIdentifier> launchToken =
|
||||||
|
launchCredentials.getToken(token.getService());
|
||||||
|
Assert.assertNotNull("Token " + token.getService() + " is missing",
|
||||||
|
launchToken);
|
||||||
|
Assert.assertEquals("Token " + token.getService() + " mismatch",
|
||||||
|
token, launchToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify the secret key is in the launch context
|
||||||
|
Assert.assertNotNull("Secret key missing",
|
||||||
|
launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
|
||||||
|
Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
|
||||||
|
launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
|
||||||
|
}
|
||||||
|
|
||||||
|
static public class StubbedFS extends RawLocalFileSystem {
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
return new FileStatus(1, false, 1, 1, 1, f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMRAppHistoryForMap() throws Exception {
|
public void testMRAppHistoryForMap() throws Exception {
|
||||||
MRApp app = new FailingAttemptsMRApp(1, 0);
|
MRApp app = new FailingAttemptsMRApp(1, 0);
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
|
@ -74,7 +75,7 @@ public class TestTaskImpl {
|
||||||
private Token<JobTokenIdentifier> jobToken;
|
private Token<JobTokenIdentifier> jobToken;
|
||||||
private JobId jobId;
|
private JobId jobId;
|
||||||
private Path remoteJobConfFile;
|
private Path remoteJobConfFile;
|
||||||
private Collection<Token<? extends TokenIdentifier>> fsTokens;
|
private Credentials credentials;
|
||||||
private Clock clock;
|
private Clock clock;
|
||||||
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
||||||
private MRAppMetrics metrics;
|
private MRAppMetrics metrics;
|
||||||
|
@ -100,12 +101,12 @@ public class TestTaskImpl {
|
||||||
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
|
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
|
||||||
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||||
MRAppMetrics metrics, AppContext appContext) {
|
MRAppMetrics metrics, AppContext appContext) {
|
||||||
super(jobId, taskType , partition, eventHandler,
|
super(jobId, taskType , partition, eventHandler,
|
||||||
remoteJobConfFile, conf, taskAttemptListener, committer,
|
remoteJobConfFile, conf, taskAttemptListener, committer,
|
||||||
jobToken, fsTokens, clock,
|
jobToken, credentials, clock,
|
||||||
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,7 +119,7 @@ public class TestTaskImpl {
|
||||||
protected TaskAttemptImpl createAttempt() {
|
protected TaskAttemptImpl createAttempt() {
|
||||||
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
|
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
|
||||||
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
|
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
|
||||||
conf, committer, jobToken, fsTokens, clock, appContext);
|
conf, committer, jobToken, credentials, clock, appContext);
|
||||||
taskAttempts.add(attempt);
|
taskAttempts.add(attempt);
|
||||||
return attempt;
|
return attempt;
|
||||||
}
|
}
|
||||||
|
@ -140,10 +141,10 @@ public class TestTaskImpl {
|
||||||
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
||||||
JobConf conf, OutputCommitter committer,
|
JobConf conf, OutputCommitter committer,
|
||||||
Token<JobTokenIdentifier> jobToken,
|
Token<JobTokenIdentifier> jobToken,
|
||||||
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
Credentials credentials, Clock clock,
|
||||||
AppContext appContext) {
|
AppContext appContext) {
|
||||||
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
|
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
|
||||||
dataLocations, committer, jobToken, fsTokens, clock, appContext);
|
dataLocations, committer, jobToken, credentials, clock, appContext);
|
||||||
attemptId = Records.newRecord(TaskAttemptId.class);
|
attemptId = Records.newRecord(TaskAttemptId.class);
|
||||||
attemptId.setId(id);
|
attemptId.setId(id);
|
||||||
attemptId.setTaskId(taskId);
|
attemptId.setTaskId(taskId);
|
||||||
|
@ -203,7 +204,7 @@ public class TestTaskImpl {
|
||||||
committer = mock(OutputCommitter.class);
|
committer = mock(OutputCommitter.class);
|
||||||
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
|
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
|
||||||
remoteJobConfFile = mock(Path.class);
|
remoteJobConfFile = mock(Path.class);
|
||||||
fsTokens = null;
|
credentials = null;
|
||||||
clock = new SystemClock();
|
clock = new SystemClock();
|
||||||
metrics = mock(MRAppMetrics.class);
|
metrics = mock(MRAppMetrics.class);
|
||||||
dataLocations = new String[1];
|
dataLocations = new String[1];
|
||||||
|
@ -224,7 +225,7 @@ public class TestTaskImpl {
|
||||||
|
|
||||||
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
|
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
|
||||||
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
|
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
|
||||||
fsTokens, clock,
|
credentials, clock,
|
||||||
completedTasksFromPreviousRun, startCount,
|
completedTasksFromPreviousRun, startCount,
|
||||||
metrics, appContext);
|
metrics, appContext);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
[libdefaults]
|
||||||
|
default_realm = APACHE.ORG
|
||||||
|
udp_preference_limit = 1
|
||||||
|
extra_addresses = 127.0.0.1
|
||||||
|
[realms]
|
||||||
|
APACHE.ORG = {
|
||||||
|
admin_server = localhost:88
|
||||||
|
kdc = localhost:88
|
||||||
|
}
|
||||||
|
[domain_realm]
|
||||||
|
localhost = APACHE.ORG
|
Loading…
Reference in New Issue