MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297788 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-03-06 23:21:13 +00:00
parent bb86825d49
commit c3a4de0ec0
36 changed files with 393 additions and 81 deletions

View File

@ -287,6 +287,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3976. TestRMContainerAllocator failing (Jason Lowe via bobby)
MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth
via bobby)
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.security.token.Token;
@ -34,7 +35,7 @@
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
@SuppressWarnings("rawtypes")
public class MapTaskAttemptImpl extends TaskAttemptImpl {
private final TaskSplitMetaInfo splitInfo;
@ -44,10 +45,11 @@ public MapTaskAttemptImpl(TaskId taskId, int attempt,
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
AppContext appContext) {
super(taskId, attempt, eventHandler,
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
committer, jobToken, fsTokens, clock);
committer, jobToken, fsTokens, clock, appContext);
this.splitInfo = splitInfo;
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.security.token.Token;
@ -33,7 +34,7 @@
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
@SuppressWarnings("rawtypes")
public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
private final int numMapTasks;
@ -43,9 +44,11 @@ public ReduceTaskAttemptImpl(TaskId id, int attempt,
int numMapTasks, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
AppContext appContext) {
super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
conf, new String[] {}, committer, jobToken, fsTokens, clock);
conf, new String[] {}, committer, jobToken, fsTokens, clock,
appContext);
this.numMapTasks = numMapTasks;
}

View File

@ -607,13 +607,13 @@ private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue());
summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
}
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setMapSlotSeconds(slotMillisReduceCounter.getValue());
summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -49,7 +50,10 @@ public interface AppContext {
Map<JobId, Job> getAllJobs();
@SuppressWarnings("rawtypes")
EventHandler getEventHandler();
Clock getClock();
ClusterInfo getClusterInfo();
}

View File

@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.recover;
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.hadoop.yarn.Clock;
class ControlledClock implements Clock {
public class ControlledClock implements Clock {
private long time = -1;
private final Clock actualClock;
ControlledClock(Clock actualClock) {
public ControlledClock(Clock actualClock) {
this.actualClock = actualClock;
}
synchronized void setTime(long time) {
public synchronized void setTime(long time) {
this.time = time;
}
synchronized void reset() {
public synchronized void reset() {
time = -1;
}

View File

@ -91,6 +91,7 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -449,10 +450,11 @@ protected Recovery createRecoveryService(AppContext appContext) {
protected Job createJob(Configuration conf) {
// create single job
Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
.getEventHandler(), taskAttemptListener, jobTokenSecretManager,
fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@ -710,6 +712,7 @@ private class RunningAppContext implements AppContext {
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
public RunningAppContext(Configuration config) {
this.conf = config;
@ -759,6 +762,11 @@ public CharSequence getUser() {
public Clock getClock() {
return clock;
}
@Override
public ClusterInfo getClusterInfo() {
return this.clusterInfo;
}
}
@SuppressWarnings("unchecked")

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
@ -151,6 +152,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final String userName;
private final String queueName;
private final long appSubmitTime;
private final AppContext appContext;
private boolean lazyTasksCopyNeeded = false;
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@ -379,7 +381,7 @@ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos) {
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@ -388,6 +390,7 @@ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.amInfos = amInfos;
this.appContext = appContext;
this.userName = userName;
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.appSubmitTime = appSubmitTime;
@ -1066,7 +1069,7 @@ private void createMapTasks(JobImpl job, long inputLength,
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics);
job.metrics, job.appContext);
job.addTask(task);
}
LOG.info("Input size for job " + job.jobId + " = " + inputLength
@ -1084,7 +1087,7 @@ private void createReduceTasks(JobImpl job) {
job.fsTokens.getAllTokens(), job.clock,
job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics);
job.metrics, job.appContext);
job.addTask(task);
}
LOG.info("Number of reduces for job " + job.jobId + " = "

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
@ -51,10 +52,10 @@ public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, committer, jobToken, fsTokens, clock,
completedTasksFromPreviousRun, startCount, metrics);
completedTasksFromPreviousRun, startCount, metrics, appContext);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
@ -68,7 +69,7 @@ protected TaskAttemptImpl createAttempt() {
return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
eventHandler, jobFile,
partition, taskSplitMetaInfo, conf, taskAttemptListener,
committer, jobToken, fsTokens, clock);
committer, jobToken, fsTokens, clock, appContext);
}
@Override

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
@ -49,10 +50,10 @@ public ReduceTaskImpl(JobId jobId, int partition,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, committer, jobToken, fsTokens, clock,
completedTasksFromPreviousRun, startCount, metrics);
completedTasksFromPreviousRun, startCount, metrics, appContext);
this.numMapTasks = numMapTasks;
}
@ -66,7 +67,7 @@ protected TaskAttemptImpl createAttempt() {
return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
eventHandler, jobFile,
partition, numMapTasks, conf, taskAttemptListener,
committer, jobToken, fsTokens, clock);
committer, jobToken, fsTokens, clock, appContext);
}
@Override

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@ -128,7 +129,6 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
/**
* Implementation of TaskAttempt interface.
*/
@ -140,8 +140,6 @@ public abstract class TaskAttemptImpl implements
static final Counters EMPTY_COUNTERS = new Counters();
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
private static final int MAP_MEMORY_MB_DEFAULT = 1024;
private static final int REDUCE_MEMORY_MB_DEFAULT = 1024;
private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final JobConf conf;
@ -158,6 +156,7 @@ public abstract class TaskAttemptImpl implements
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
private final AppContext appContext;
private Collection<Token<? extends TokenIdentifier>> fsTokens;
private Token<JobTokenIdentifier> jobToken;
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
@ -459,7 +458,8 @@ public TaskAttemptImpl(TaskId taskId, int i,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
AppContext appContext) {
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
this.conf = conf;
this.clock = clock;
@ -467,6 +467,7 @@ public TaskAttemptImpl(TaskId taskId, int i,
attemptId.setTaskId(taskId);
attemptId.setId(i);
this.taskAttemptListener = taskAttemptListener;
this.appContext = appContext;
// Initialize reportedStatus
reportedStatus = new TaskAttemptStatus();
@ -497,9 +498,13 @@ public TaskAttemptImpl(TaskId taskId, int i,
private int getMemoryRequired(Configuration conf, TaskType taskType) {
int memory = 1024;
if (taskType == TaskType.MAP) {
memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, MAP_MEMORY_MB_DEFAULT);
memory =
conf.getInt(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB);
} else if (taskType == TaskType.REDUCE) {
memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, REDUCE_MEMORY_MB_DEFAULT);
memory =
conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
}
return memory;
@ -950,26 +955,26 @@ private void setFinishTime() {
finishTime = clock.getTime();
}
}
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
int minSlotMemSize =
taskAttempt.appContext.getClusterInfo().getMinContainerCapability()
.getMemory();
int simSlotsRequired =
slotMemoryReq
/ (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
: REDUCE_MEMORY_MB_DEFAULT);
// Simulating MRv1 slots for counters by assuming *_MEMORY_MB_DEFAULT
// corresponds to a MrV1 slot.
// Fallow slot millis is not applicable in MRv2 - since a container is
// either assigned with the required memory or is not. No partial
// reserveations
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
/ minSlotMemSize);
long slotMillisIncrement =
simSlotsRequired
* (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
return slotMillisIncrement;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
@ -1216,8 +1221,8 @@ public void transition(TaskAttemptImpl taskAttempt,
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
} else {
LOG.debug("Not generating HistoryFinish event since start event not generated for taskAttempt: "
+ taskAttempt.getID());
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
}
}
@ -1352,8 +1357,8 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not generated for taskAttempt: "
+ taskAttempt.getID());
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
@ -1419,8 +1424,8 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not generated for taskAttempt: "
+ taskAttempt.getID());
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
@ -1445,8 +1450,8 @@ public void transition(TaskAttemptImpl taskAttempt,
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not generated for taskAttempt: "
+ taskAttempt.getID());
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -104,6 +105,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private final Lock readLock;
private final Lock writeLock;
private final MRAppMetrics metrics;
protected final AppContext appContext;
private long scheduledTime;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@ -251,7 +253,7 @@ public TaskImpl(JobId jobId, TaskType taskType, int partition,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
MRAppMetrics metrics, AppContext appContext) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@ -271,6 +273,7 @@ public TaskImpl(JobId jobId, TaskType taskType, int partition,
this.fsTokens = fsTokens;
this.jobToken = jobToken;
this.metrics = metrics;
this.appContext = appContext;
// See if this is from a previous generation.
if (completedTasksFromPreviousRun != null

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;

View File

@ -146,6 +146,10 @@ protected void register() {
scheduler.registerApplicationMaster(request);
minContainerCapability = response.getMinimumResourceCapability();
maxContainerCapability = response.getMaximumResourceCapability();
this.context.getClusterInfo().setMinContainerCapability(
minContainerCapability);
this.context.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
this.applicationACLs = response.getApplicationACLs();
LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());

View File

@ -72,13 +72,14 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -101,7 +102,8 @@ public class MRApp extends MRAppMaster {
private File testWorkDir;
private Path testAbsPath;
private ClusterInfo clusterInfo;
public static String NM_HOST = "localhost";
public static int NM_PORT = 1234;
public static int NM_HTTP_PORT = 8042;
@ -120,6 +122,11 @@ public class MRApp extends MRAppMaster {
applicationId.setId(0);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, Clock clock) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
@ -149,15 +156,28 @@ private static ContainerId getContainerId(ApplicationId applicationId,
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock());
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount);
cleanOnStart, startCount, clock);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock());
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
@ -171,12 +191,28 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
throw new YarnException("could not cleanup test dir", e);
}
}
this.maps = maps;
this.reduces = reduces;
this.autoComplete = autoComplete;
}
@Override
public void init(Configuration conf) {
super.init(conf);
if (this.clusterInfo != null) {
getContext().getClusterInfo().setMinContainerCapability(
this.clusterInfo.getMinContainerCapability());
getContext().getClusterInfo().setMaxContainerCapability(
this.clusterInfo.getMaxContainerCapability());
} else {
getContext().getClusterInfo().setMinContainerCapability(
BuilderUtils.newResource(1024));
getContext().getClusterInfo().setMaxContainerCapability(
BuilderUtils.newResource(10240));
}
}
public Job submit(Configuration conf) throws Exception {
String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
.getCurrentUser().getShortUserName());
@ -303,7 +339,7 @@ protected Job createJob(Configuration conf) {
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName());
currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@ -391,7 +427,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
ClientService clientService, final AppContext context) {
return new ContainerAllocator(){
private int containerCount;
@Override
@ -447,6 +483,17 @@ public int getHttpPort() {
};
}
public void setClusterInfo(ClusterInfo clusterInfo) {
// Only useful if set before a job is started.
if (getServiceState() == Service.STATE.NOTINITED
|| getServiceState() == Service.STATE.INITED) {
this.clusterInfo = clusterInfo;
} else {
throw new IllegalStateException(
"ClusterInfo can only be set before the App is STARTED");
}
}
class TestJob extends JobImpl {
//override the init transition
private final TestInitTransition initTransition = new TestInitTransition(
@ -470,12 +517,14 @@ protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
OutputCommitter committer, boolean newApiCommitter, String user) {
OutputCommitter committer, boolean newApiCommitter, String user,
AppContext appContext) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext);
// This "this leak" is okay because the retained pointer is in an
// instance variable.

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -1120,6 +1121,9 @@ private static AppContext createAppContext(
when(context.getApplicationID()).thenReturn(appId);
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
when(context.getJob(isA(JobId.class))).thenReturn(job);
when(context.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
.newResource(10240)));
when(context.getEventHandler()).thenReturn(new EventHandler() {
@Override
public void handle(Event event) {

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -825,5 +826,10 @@ public String getApplicationName() {
public long getStartTime() {
return 0;
}
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo();
}
}
}

View File

@ -155,7 +155,7 @@ public void testCheckAccess() {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
null, null, null, true, null, 0, null);
null, null, null, true, null, 0, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -166,7 +166,7 @@ public void testCheckAccess() {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
null, null, null, true, null, 0, null);
null, null, null, true, null, 0, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -177,7 +177,7 @@ public void testCheckAccess() {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
null, null, null, true, null, 0, null);
null, null, null, true, null, 0, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -188,7 +188,7 @@ public void testCheckAccess() {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
null, null, null, true, null, 0, null);
null, null, null, true, null, 0, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -199,7 +199,7 @@ public void testCheckAccess() {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
null, null, null, true, null, 0, null);
null, null, null, true, null, 0, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
@ -48,6 +50,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -59,6 +62,7 @@
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
@ -67,6 +71,8 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import com.sun.source.tree.AssertTree;
@SuppressWarnings("unchecked")
public class TestTaskAttempt{
@ -153,10 +159,76 @@ public void testHostResolveAttempt() throws Exception {
}
assertEquals(0, expected.size());
}
@Test
public void testSlotMillisCounterUpdate() throws Exception {
verifySlotMillis(2048, 2048, 1024);
verifySlotMillis(2048, 1024, 1024);
verifySlotMillis(10240, 1024, 2048);
}
public void verifySlotMillis(int mapMemMb, int reduceMemMb,
int minContainerSize) throws Exception {
Clock actualClock = new SystemClock();
ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(10);
MRApp app =
new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
app.setClusterInfo(new ClusterInfo(BuilderUtils
.newResource(minContainerSize), BuilderUtils.newResource(10240)));
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Map<TaskId, Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
Iterator<Task> taskIter = tasks.values().iterator();
Task mTask = taskIter.next();
app.waitForState(mTask, TaskState.RUNNING);
Task rTask = taskIter.next();
app.waitForState(rTask, TaskState.RUNNING);
Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
TaskAttempt mta = mAttempts.values().iterator().next();
TaskAttempt rta = rAttempts.values().iterator().next();
app.waitForState(mta, TaskAttemptState.RUNNING);
app.waitForState(rta, TaskAttemptState.RUNNING);
clock.setTime(11);
app.getContext()
.getEventHandler()
.handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
app.getContext()
.getEventHandler()
.handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
Assert.assertEquals(mta.getFinishTime(), 11);
Assert.assertEquals(mta.getLaunchTime(), 10);
Assert.assertEquals(rta.getFinishTime(), 11);
Assert.assertEquals(rta.getLaunchTime(), 10);
Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
.getValue());
Assert.assertEquals(
(int) Math.ceil((float) reduceMemMb / minContainerSize), job
.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
.getValue());
}
@SuppressWarnings("rawtypes")
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
}
@SuppressWarnings("rawtypes")
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
@ -164,11 +236,10 @@ private TaskAttemptImpl createMapTaskAttemptImplForTest(
Path jobFile = mock(Path.class);
JobConf jobConf = new JobConf();
OutputCommitter outputCommitter = mock(OutputCommitter.class);
Clock clock = new SystemClock();
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
null, clock);
null, clock, null);
return taImpl;
}
@ -220,7 +291,7 @@ public void handle(JobHistoryEvent event) {
TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
.getHistoryEvent().getDatum();
Assert.assertEquals("Diagnostic Information is not Correct",
"Test Diagnostic Event", datum.get(6).toString());
"Test Diagnostic Event", datum.get(8).toString());
}
}
};

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@ -82,6 +83,7 @@ public class TestTaskImpl {
private TaskSplitMetaInfo taskSplitMetaInfo;
private String[] dataLocations = new String[0];
private final TaskType taskType = TaskType.MAP;
private AppContext appContext;
private int startCount = 0;
private int taskCounter = 0;
@ -100,11 +102,11 @@ public MockTaskImpl(JobId jobId, int partition,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
MRAppMetrics metrics, AppContext appContext) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener, committer,
jobToken, fsTokens, clock,
completedTasksFromPreviousRun, startCount, metrics);
completedTasksFromPreviousRun, startCount, metrics, appContext);
}
@Override
@ -116,7 +118,7 @@ public TaskType getType() {
protected TaskAttemptImpl createAttempt() {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
conf, committer, jobToken, fsTokens, clock);
conf, committer, jobToken, fsTokens, clock, appContext);
taskAttempts.add(attempt);
return attempt;
}
@ -138,9 +140,10 @@ public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
AppContext appContext) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
dataLocations, committer, jobToken, fsTokens, clock);
dataLocations, committer, jobToken, fsTokens, clock, appContext);
attemptId = Records.newRecord(TaskAttemptId.class);
attemptId.setId(id);
attemptId.setTaskId(taskId);
@ -212,6 +215,7 @@ public void setup() {
jobId = Records.newRecord(JobId.class);
jobId.setId(1);
jobId.setAppId(appId);
appContext = mock(AppContext.class);
taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations);
@ -222,7 +226,7 @@ public void setup() {
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
fsTokens, clock,
completedTasksFromPreviousRun, startCount,
metrics);
metrics, appContext);
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -107,6 +108,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
@Test public void testAppControllerIndex() {

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -135,6 +136,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -145,6 +146,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -146,6 +147,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -148,6 +149,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -142,6 +143,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -175,6 +175,7 @@ public interface MRJobConfig {
public static final String MAP_INPUT_START = "mapreduce.map.input.start";
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb";
@ -225,6 +226,7 @@ public interface MRJobConfig {
public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb";
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -1095,10 +1096,16 @@ public CharSequence getUser() {
}
return userName;
}
//TODO AppContext - Not Required
@Override
public Clock getClock() {
return null;
}
// TODO AppContext - Not Required
@Override
public Clock getClock() {
return null;
}
// TODO AppContext - Not Required
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.webapp.TestAMWebApp;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -117,6 +118,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
@Test public void testAppControllerIndex() {

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -138,6 +139,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -150,6 +151,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -150,6 +151,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -163,6 +164,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -145,6 +146,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -146,6 +147,11 @@ public String getApplicationName() {
public long getStartTime() {
return startTime;
}
@Override
public ClusterInfo getClusterInfo() {
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
public class ClusterInfo {
private Resource minContainerCapability;
private Resource maxContainerCapability;
public ClusterInfo() {
this.minContainerCapability = Records.newRecord(Resource.class);
this.maxContainerCapability = Records.newRecord(Resource.class);
}
public ClusterInfo(Resource minCapability, Resource maxCapability) {
this.minContainerCapability = minCapability;
this.maxContainerCapability = maxCapability;
}
public Resource getMinContainerCapability() {
return minContainerCapability;
}
public void setMinContainerCapability(Resource minContainerCapability) {
this.minContainerCapability = minContainerCapability;
}
public Resource getMaxContainerCapability() {
return maxContainerCapability;
}
public void setMaxContainerCapability(Resource maxContainerCapability) {
this.maxContainerCapability = maxContainerCapability;
}
}