MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM thereby reducing the AM heap size and preventing full GCs. (vinodkv)

svn merge -c 1229347 trunk


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229350 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-09 21:07:25 +00:00
parent e508495147
commit 65e6abb477
37 changed files with 584 additions and 405 deletions

View File

@ -108,6 +108,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3569. TaskAttemptListener holds a global lock for all
task-updates. (Vinod Kumar Vavilapalli via sseth)
MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM
thereby reducing the AM heap size and preventing full GCs. (vinodkv)
BUG FIXES
MAPREDUCE-3462. Fix Gridmix JUnit testcase failures.
(Ravi Prakash and Ravi Gummadi via amarrk)

View File

@ -325,9 +325,11 @@ public class TaskAttemptListenerImpl extends CompositeService
taskAttemptStatus.outputSize = taskStatus.getOutputSize();
// Task sends the updated phase to the TT.
taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase());
// Counters are updated by the task.
taskAttemptStatus.counters =
TypeConverter.toYarn(taskStatus.getCounters());
// Counters are updated by the task. Convert counters into new format as
// that is the primary storage format inside the AM to avoid multiple
// conversions and unnecessary heap usage.
taskAttemptStatus.counters = new org.apache.hadoop.mapreduce.Counters(
taskStatus.getCounters());
// Map Finish time set by the task (map only)
if (taskStatus.getIsMap() && taskStatus.getMapFinishTime() != 0) {

View File

@ -36,10 +36,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -63,6 +64,8 @@ public class JobHistoryEventHandler extends AbstractService
private final AppContext context;
private final int startCount;
private int eventCounter;
//TODO Does the FS object need to be different ?
private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
@ -210,6 +213,16 @@ public class JobHistoryEventHandler extends AbstractService
public void run() {
JobHistoryEvent event = null;
while (!stopped && !Thread.currentThread().isInterrupted()) {
// Log the size of the history-event-queue every so often.
if (eventCounter % 1000 == 0) {
eventCounter = 0;
LOG.info("Size of the JobHistory event queue is "
+ eventQueue.size());
} else {
eventCounter++;
}
try {
event = eventQueue.take();
} catch (InterruptedException e) {
@ -238,7 +251,8 @@ public class JobHistoryEventHandler extends AbstractService
@Override
public void stop() {
LOG.info("Stopping JobHistoryEventHandler");
LOG.info("Stopping JobHistoryEventHandler. "
+ "Size of the outstanding queue size is " + eventQueue.size());
stopped = true;
//do not interrupt while event handling is in progress
synchronized(lock) {
@ -483,7 +497,7 @@ public class JobHistoryEventHandler extends AbstractService
.toString());
// TODO JOB_FINISHED does not have state. Effectively job history does not
// have state about the finished job.
setSummarySlotSeconds(summary, jobId);
setSummarySlotSeconds(summary, jfe.getTotalCounters());
break;
case JOB_FAILED:
case JOB_KILLED:
@ -492,21 +506,21 @@ public class JobHistoryEventHandler extends AbstractService
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setJobFinishTime(juce.getFinishTime());
setSummarySlotSeconds(summary, jobId);
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break;
}
}
private void setSummarySlotSeconds(JobSummary summary, JobId jobId) {
Counter slotMillisMapCounter =
context.getJob(jobId).getCounters()
.getCounter(JobCounter.SLOTS_MILLIS_MAPS);
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue());
}
Counter slotMillisReduceCounter =
context.getJob(jobId).getCounters()
.getCounter(JobCounter.SLOTS_MILLIS_REDUCES);
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setMapSlotSeconds(slotMillisReduceCounter.getValue());
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@ -223,7 +224,7 @@ public class MRClientService extends AbstractService
Job job = verifyAndGetJob(jobId, false);
GetCountersResponse response =
recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(job.getCounters());
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
return response;
}
@ -238,7 +239,6 @@ public class MRClientService extends AbstractService
return response;
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
@ -356,6 +356,8 @@ public class MRClientService extends AbstractService
return response;
}
private final Object getTaskReportsLock = new Object();
@Override
public GetTaskReportsResponse getTaskReports(
GetTaskReportsRequest request) throws YarnRemoteException {
@ -366,12 +368,18 @@ public class MRClientService extends AbstractService
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, false);
LOG.info("Getting task report for " + taskType + " " + jobId);
Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report size " + tasks.size());
LOG.info("Getting task report for " + taskType + " " + jobId
+ ". Report-size will be " + tasks.size());
// Take lock to allow only one call, otherwise heap will blow up because
// of counters in the report when there are multiple callers.
synchronized (getTaskReportsLock) {
for (Task task : tasks) {
response.addTaskReport(task.getReport());
}
}
return response;
}
}

View File

@ -22,9 +22,9 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -44,7 +44,15 @@ public interface Job {
String getName();
JobState getState();
JobReport getReport();
Counters getCounters();
/**
* Get all the counters of this job. This includes job-counters aggregated
* together with the counters of each task. This creates a clone of the
* Counters, so use this judiciously.
* @return job-counters and aggregate task-counters
*/
Counters getAllCounters();
Map<TaskId,Task> getTasks();
Map<TaskId,Task> getTasks(TaskType taskType);
Task getTask(TaskId taskID);

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job;
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job;
import java.util.List;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;

View File

@ -20,12 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app.job.event;
import java.util.List;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private TaskAttemptStatus reportedTaskAttemptStatus;

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -61,9 +62,6 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -99,7 +97,6 @@ import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
@ -109,10 +106,13 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> {
private static final TaskAttemptCompletionEvent[]
EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
private static final Log LOG = LogFactory.getLog(JobImpl.class);
//The maximum fraction of fetch failures allowed for a map
@ -152,7 +152,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private boolean lazyTasksCopyNeeded = false;
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = newCounters();
private Counters jobCounters = new Counters();
// FIXME:
//
// Can then replace task-level uber counters (MR-2424) with job-level ones
@ -475,88 +475,29 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
@Override
public Counters getCounters() {
Counters counters = newCounters();
public Counters getAllCounters() {
Counters counters = new Counters();
readLock.lock();
try {
incrAllCounters(counters, jobCounters);
counters.incrAllCounters(jobCounters);
return incrTaskCounters(counters, tasks.values());
} finally {
readLock.unlock();
}
}
private Counters getTypeCounters(Set<TaskId> taskIds) {
Counters counters = newCounters();
for (TaskId taskId : taskIds) {
Task task = tasks.get(taskId);
incrAllCounters(counters, task.getCounters());
}
return counters;
}
private Counters getMapCounters() {
readLock.lock();
try {
return getTypeCounters(mapTasks);
} finally {
readLock.unlock();
}
}
private Counters getReduceCounters() {
readLock.lock();
try {
return getTypeCounters(reduceTasks);
} finally {
readLock.unlock();
}
}
public static Counters newCounters() {
Counters counters = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(Counters.class);
return counters;
}
public static Counters incrTaskCounters(Counters counters,
Collection<Task> tasks) {
public static Counters incrTaskCounters(
Counters counters, Collection<Task> tasks) {
for (Task task : tasks) {
incrAllCounters(counters, task.getCounters());
counters.incrAllCounters(task.getCounters());
}
return counters;
}
public static void incrAllCounters(Counters counters, Counters other) {
if (other != null) {
for (CounterGroup otherGroup: other.getAllCounterGroups().values()) {
CounterGroup group = counters.getCounterGroup(otherGroup.getName());
if (group == null) {
group = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(CounterGroup.class);
group.setName(otherGroup.getName());
counters.setCounterGroup(group.getName(), group);
}
group.setDisplayName(otherGroup.getDisplayName());
for (Counter otherCounter : otherGroup.getAllCounters().values()) {
Counter counter = group.getCounter(otherCounter.getName());
if (counter == null) {
counter = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(Counter.class);
counter.setName(otherCounter.getName());
group.setCounter(counter.getName(), counter);
}
counter.setDisplayName(otherCounter.getDisplayName());
counter.setValue(counter.getValue() + otherCounter.getValue());
}
}
}
}
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock();
try {
if (taskAttemptCompletionEvents.size() > fromEventId) {
@ -1204,13 +1145,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// area. May need to create a new event type for this if JobFinished should
// not be generated for KilledJobs, etc.
private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
Counters mapCounters = new Counters();
Counters reduceCounters = new Counters();
for (Task t : job.tasks.values()) {
Counters counters = t.getCounters();
switch (t.getType()) {
case MAP: mapCounters.incrAllCounters(counters); break;
case REDUCE: reduceCounters.incrAllCounters(counters); break;
}
}
JobFinishedEvent jfe = new JobFinishedEvent(
job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount,
TypeConverter.fromYarn(job.getMapCounters()),
TypeConverter.fromYarn(job.getReduceCounters()),
TypeConverter.fromYarn(job.getCounters()));
mapCounters,
reduceCounters,
job.getAllCounters());
return jfe;
}
@ -1450,7 +1402,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
.getCounterUpdates()) {
job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue());
job.jobCounters.findCounter(ci.getCounterKey()).increment(
ci.getIncrementValue());
}
}
}

View File

@ -47,6 +47,8 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -60,8 +62,6 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@ -132,6 +132,7 @@ public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
static final Counters EMPTY_COUNTERS = new Counters();
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
private static final int MAP_MEMORY_MB_DEFAULT = 1024;
@ -846,7 +847,7 @@ public abstract class TaskAttemptImpl implements
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(getCounters());
result.setCounters(TypeConverter.toYarn(getCounters()));
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
@ -877,7 +878,7 @@ public abstract class TaskAttemptImpl implements
try {
Counters counters = reportedStatus.counters;
if (counters == null) {
counters = recordFactory.newRecordInstance(Counters.class);
counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
}
return counters;
@ -1031,22 +1032,21 @@ public abstract class TaskAttemptImpl implements
(int) (now - start));
}
Counter cpuCounter = counters.getCounter(
TaskCounter.CPU_MILLISECONDS);
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue());
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters.getCounter(
TaskCounter.VIRTUAL_MEMORY_BYTES);
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters.getCounter(
TaskCounter.PHYSICAL_MEMORY_BYTES);
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
@ -1343,7 +1343,7 @@ public abstract class TaskAttemptImpl implements
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getCounters(),
getProgressSplitBlock().burst());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
@ -1360,7 +1360,7 @@ public abstract class TaskAttemptImpl implements
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getCounters(),
getProgressSplitBlock().burst());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
@ -1498,8 +1498,8 @@ public abstract class TaskAttemptImpl implements
result.phase = Phase.STARTING;
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Counters counters = recordFactory.newRecordInstance(Counters.class);
// counters.groups = new HashMap<String, CounterGroup>();
Counters counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters;
}

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -40,7 +41,6 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@ -329,7 +329,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
report.setFinishTime(getFinishTime());
report.setTaskState(getState());
report.setProgress(getProgress());
report.setCounters(getCounters());
for (TaskAttempt attempt : attempts.values()) {
if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
@ -346,6 +345,11 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
// Add a copy of counters as the last step so that their lifetime on heap
// is as small as possible.
report.setCounters(TypeConverter.toYarn(getCounters()));
return report;
} finally {
readLock.unlock();
@ -361,7 +365,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
if (bestAttempt != null) {
counters = bestAttempt.getCounters();
} else {
counters = recordFactory.newRecordInstance(Counters.class);
counters = TaskAttemptImpl.EMPTY_COUNTERS;
// counters.groups = new HashMap<CharSequence, CounterGroup>();
}
return counters;
@ -595,7 +599,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
taskState.toString(),
TypeConverter.fromYarn(task.getCounters()));
task.getCounters());
return tfe;
}

View File

@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
//TODO:
//task cleanup for all non completed tasks
public class RecoveryService extends CompositeService implements Recovery {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@ -411,8 +410,7 @@ public class RecoveryService extends CompositeService implements Recovery {
if (cntrs == null) {
taskAttemptStatus.counters = null;
} else {
taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo
.getCounters());
taskAttemptStatus.counters = cntrs;
}
actualHandler.handle(new TaskAttemptStatusUpdateEvent(
taskAttemptStatus.id, taskAttemptStatus));

View File

@ -18,25 +18,32 @@
package org.apache.hadoop.mapreduce.v2.app.webapp;
import com.google.inject.Inject;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_TABLE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
import com.google.inject.Inject;
public class CountersBlock extends HtmlBlock {
Job job;
@ -62,8 +69,7 @@ public class CountersBlock extends HtmlBlock {
return;
}
if(total == null || total.getAllCounterGroups() == null ||
total.getAllCounterGroups().size() <= 0) {
if(total == null || total.getGroupNames() == null) {
String type = $(TASK_ID);
if(type == null || type.isEmpty()) {
type = $(JOB_ID, "the job");
@ -93,9 +99,9 @@ public class CountersBlock extends HtmlBlock {
th(".group.ui-state-default", "Counter Group").
th(".ui-state-default", "Counters")._()._().
tbody();
for (CounterGroup g : total.getAllCounterGroups().values()) {
CounterGroup mg = map == null ? null : map.getCounterGroup(g.getName());
CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g.getName());
for (CounterGroup g : total) {
CounterGroup mg = map == null ? null : map.getGroup(g.getName());
CounterGroup rg = reduce == null ? null : reduce.getGroup(g.getName());
++numGroups;
// This is mostly for demonstration :) Typically we'd introduced
// a CounterGroup block to reduce the verbosity. OTOH, this
@ -116,7 +122,7 @@ public class CountersBlock extends HtmlBlock {
TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>> group = groupHeadRow.
th(map == null ? "Value" : "Total")._()._().
tbody();
for (Counter counter : g.getAllCounters().values()) {
for (Counter counter : g) {
// Ditto
TR<TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupRow = group.
tr();
@ -130,8 +136,8 @@ public class CountersBlock extends HtmlBlock {
_();
}
if (map != null) {
Counter mc = mg == null ? null : mg.getCounter(counter.getName());
Counter rc = rg == null ? null : rg.getCounter(counter.getName());
Counter mc = mg == null ? null : mg.findCounter(counter.getName());
Counter rc = rg == null ? null : rg.findCounter(counter.getName());
groupRow.
td(mc == null ? "0" : String.valueOf(mc.getValue())).
td(rc == null ? "0" : String.valueOf(rc.getValue()));
@ -173,14 +179,14 @@ public class CountersBlock extends HtmlBlock {
}
// Get all types of counters
Map<TaskId, Task> tasks = job.getTasks();
total = job.getCounters();
map = JobImpl.newCounters();
reduce = JobImpl.newCounters();
total = job.getAllCounters();
map = new Counters();
reduce = new Counters();
for (Task t : tasks.values()) {
Counters counters = t.getCounters();
switch (t.getType()) {
case MAP: JobImpl.incrAllCounters(map, counters); break;
case REDUCE: JobImpl.incrAllCounters(reduce, counters); break;
case MAP: map.incrAllCounters(counters); break;
case REDUCE: reduce.incrAllCounters(counters); break;
}
}
}

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.mapreduce.v2.app.webapp;
import com.google.inject.Inject;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.COUNTER_GROUP;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.COUNTER_NAME;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -40,8 +45,7 @@ import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
import com.google.inject.Inject;
public class SingleCounterBlock extends HtmlBlock {
protected TreeMap<String, Long> values = new TreeMap<String, Long>();
@ -122,10 +126,10 @@ public class SingleCounterBlock extends HtmlBlock {
task.getAttempts().entrySet()) {
long value = 0;
Counters counters = entry.getValue().getCounters();
CounterGroup group = (counters != null)
? counters.getCounterGroup($(COUNTER_GROUP)) : null;
CounterGroup group = (counters != null) ? counters
.getGroup($(COUNTER_GROUP)) : null;
if(group != null) {
Counter c = group.getCounter($(COUNTER_NAME));
Counter c = group.findCounter($(COUNTER_NAME));
if(c != null) {
value = c.getValue();
}
@ -140,9 +144,9 @@ public class SingleCounterBlock extends HtmlBlock {
for(Map.Entry<TaskId, Task> entry : tasks.entrySet()) {
long value = 0;
CounterGroup group = entry.getValue().getCounters()
.getCounterGroup($(COUNTER_GROUP));
.getGroup($(COUNTER_GROUP));
if(group != null) {
Counter c = group.getCounter($(COUNTER_NAME));
Counter c = group.findCounter($(COUNTER_NAME));
if(c != null) {
value = c.getValue();
}

View File

@ -24,8 +24,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
@XmlRootElement(name = "counterGroup")
@XmlAccessorType(XmlAccessType.FIELD)
@ -38,14 +38,14 @@ public class CounterGroupInfo {
public CounterGroupInfo() {
}
public CounterGroupInfo(String name, CounterGroup g, CounterGroup mg,
public CounterGroupInfo(String name, CounterGroup group, CounterGroup mg,
CounterGroup rg) {
this.counterGroupName = name;
this.counter = new ArrayList<CounterInfo>();
for (Counter c : g.getAllCounters().values()) {
Counter mc = mg == null ? null : mg.getCounter(c.getName());
Counter rc = rg == null ? null : rg.getCounter(c.getName());
for (Counter c : group) {
Counter mc = mg == null ? null : mg.findCounter(c.getName());
Counter rc = rg == null ? null : rg.findCounter(c.getName());
CounterInfo cinfo = new CounterInfo(c, mc, rc);
this.counter.add(cinfo);
}

View File

@ -21,7 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.Counter;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@ -35,9 +35,9 @@ public class CounterInfo {
public CounterInfo() {
}
public CounterInfo(Counter counter, Counter mc, Counter rc) {
this.name = counter.getName();
this.totalCounterValue = counter.getValue();
public CounterInfo(Counter c, Counter mc, Counter rc) {
this.name = c.getName();
this.totalCounterValue = c.getValue();
this.mapCounterValue = mc == null ? 0 : mc.getValue();
this.reduceCounterValue = rc == null ? 0 : rc.getValue();
}

View File

@ -25,13 +25,12 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
@XmlRootElement(name = "jobCounters")
@ -56,18 +55,15 @@ public class JobCounterInfo {
counterGroup = new ArrayList<CounterGroupInfo>();
this.id = MRApps.toString(job.getID());
int numGroups = 0;
if (total != null) {
for (CounterGroup g : total.getAllCounterGroups().values()) {
for (CounterGroup g : total) {
if (g != null) {
CounterGroup mg = map == null ? null : map.getCounterGroup(g
.getName());
CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g
.getName());
++numGroups;
CounterGroup mg = map == null ? null : map.getGroup(g.getName());
CounterGroup rg = reduce == null ? null : reduce
.getGroup(g.getName());
CounterGroupInfo cginfo = new CounterGroupInfo(g.getName(), g, mg, rg);
CounterGroupInfo cginfo = new CounterGroupInfo(g.getName(), g,
mg, rg);
counterGroup.add(cginfo);
}
}
@ -75,23 +71,23 @@ public class JobCounterInfo {
}
private void getCounters(AppContext ctx, Job job) {
total = JobImpl.newCounters();
total = new Counters();
if (job == null) {
return;
}
map = JobImpl.newCounters();
reduce = JobImpl.newCounters();
map = new Counters();
reduce = new Counters();
// Get all types of counters
Map<TaskId, Task> tasks = job.getTasks();
for (Task t : tasks.values()) {
Counters counters = t.getCounters();
JobImpl.incrAllCounters(total, counters);
total.incrAllCounters(counters);
switch (t.getType()) {
case MAP:
JobImpl.incrAllCounters(map, counters);
map.incrAllCounters(counters);
break;
case REDUCE:
JobImpl.incrAllCounters(reduce, counters);
reduce.incrAllCounters(counters);
break;
}
}

View File

@ -25,8 +25,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
@ -49,7 +49,7 @@ public class JobTaskAttemptCounterInfo {
total = taskattempt.getCounters();
taskAttemptCounterGroup = new ArrayList<TaskCounterGroupInfo>();
if (total != null) {
for (CounterGroup g : total.getAllCounterGroups().values()) {
for (CounterGroup g : total) {
if (g != null) {
TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g);
if (cginfo != null) {

View File

@ -25,8 +25,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
@ -48,7 +48,7 @@ public class JobTaskCounterInfo {
this.id = MRApps.toString(task.getID());
taskCounterGroup = new ArrayList<TaskCounterGroupInfo>();
if (total != null) {
for (CounterGroup g : total.getAllCounterGroups().values()) {
for (CounterGroup g : total) {
if (g != null) {
TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g);
taskCounterGroup.add(cginfo);

View File

@ -24,8 +24,8 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@ -37,11 +37,11 @@ public class TaskCounterGroupInfo {
public TaskCounterGroupInfo() {
}
public TaskCounterGroupInfo(String name, CounterGroup g) {
public TaskCounterGroupInfo(String name, CounterGroup group) {
this.counterGroupName = name;
this.counter = new ArrayList<TaskCounterInfo>();
for (Counter c : g.getAllCounters().values()) {
for (Counter c : group) {
TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
this.counter.add(cinfo);
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter;
@ -37,7 +38,6 @@ import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -144,7 +144,7 @@ public class MockJobs extends MockApps {
report.setFinishTime(System.currentTimeMillis()
+ (int) (Math.random() * DT) + 1);
report.setProgress((float) Math.random());
report.setCounters(newCounters());
report.setCounters(TypeConverter.toYarn(newCounters()));
report.setTaskState(TASK_STATES.next());
return report;
}
@ -159,13 +159,12 @@ public class MockJobs extends MockApps {
report.setPhase(PHASES.next());
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
report.setProgress((float) Math.random());
report.setCounters(newCounters());
report.setCounters(TypeConverter.toYarn(newCounters()));
return report;
}
@SuppressWarnings("deprecation")
public static Counters newCounters() {
org.apache.hadoop.mapred.Counters hc = new org.apache.hadoop.mapred.Counters();
Counters hc = new Counters();
for (JobCounter c : JobCounter.values()) {
hc.findCounter(c).setValue((long) (Math.random() * 1000));
}
@ -183,7 +182,7 @@ public class MockJobs extends MockApps {
hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next())
.setValue((long) (Math.random() * 100000));
}
return TypeConverter.toYarn(hc);
return hc;
}
public static Map<TaskAttemptId, TaskAttempt> newTaskAttempts(TaskId tid,
@ -231,7 +230,10 @@ public class MockJobs extends MockApps {
@Override
public Counters getCounters() {
return report.getCounters();
if (report != null && report.getCounters() != null) {
return new Counters(TypeConverter.fromYarn(report.getCounters()));
}
return null;
}
@Override
@ -327,7 +329,8 @@ public class MockJobs extends MockApps {
@Override
public Counters getCounters() {
return report.getCounters();
return new Counters(
TypeConverter.fromYarn(report.getCounters()));
}
@Override
@ -373,8 +376,9 @@ public class MockJobs extends MockApps {
};
}
public static Counters getCounters(Collection<Task> tasks) {
Counters counters = JobImpl.newCounters();
public static Counters getCounters(
Collection<Task> tasks) {
Counters counters = new Counters();
return JobImpl.incrTaskCounters(counters, tasks);
}
@ -419,7 +423,8 @@ public class MockJobs extends MockApps {
final JobReport report = newJobReport(id);
final Map<TaskId, Task> tasks = newTasks(id, n, m);
final TaskCount taskCount = getTaskCount(tasks.values());
final Counters counters = getCounters(tasks.values());
final Counters counters = getCounters(tasks
.values());
final Path configFile = confFile;
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
@ -457,7 +462,7 @@ public class MockJobs extends MockApps {
}
@Override
public Counters getCounters() {
public Counters getAllCounters() {
return counters;
}

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -30,11 +27,14 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -46,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
@ -74,7 +73,7 @@ import org.apache.hadoop.yarn.service.CompositeService;
import org.junit.Assert;
import org.junit.Test;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRuntimeEstimators {
private static int INITIAL_NUMBER_FREE_SLOTS = 600;
@ -399,7 +398,7 @@ public class TestRuntimeEstimators {
}
@Override
public Counters getCounters() {
public Counters getAllCounters() {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -686,7 +686,7 @@ public class TestAMWebServicesAttempts extends JerseyTest {
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i);
JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("name not set",
(counterName != null && !counterName.isEmpty()));

View File

@ -774,7 +774,7 @@ public class TestAMWebServicesTasks extends JerseyTest {
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i);
JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("name not set",
(counterName != null && !counterName.isEmpty()));

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.TaskCompletionEvent;
@ -45,14 +44,15 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@SuppressWarnings("deprecation")
public class TypeConverter {
private static RecordFactory recordFactory;

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.avro.util.Utf8;
/**
* Event to record successful completion of job
*
@ -34,7 +31,18 @@ import org.apache.avro.util.Utf8;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobFinishedEvent implements HistoryEvent {
private JobFinished datum = new JobFinished();
private JobFinished datum = null;
private JobID jobId;
private long finishTime;
private int finishedMaps;
private int finishedReduces;
private int failedMaps;
private int failedReduces;
private Counters mapCounters;
private Counters reduceCounters;
private Counters totalCounters;
/**
* Create an event to record successful job completion
@ -53,50 +61,75 @@ public class JobFinishedEvent implements HistoryEvent {
int failedMaps, int failedReduces,
Counters mapCounters, Counters reduceCounters,
Counters totalCounters) {
datum.jobid = new Utf8(id.toString());
this.jobId = id;
this.finishTime = finishTime;
this.finishedMaps = finishedMaps;
this.finishedReduces = finishedReduces;
this.failedMaps = failedMaps;
this.failedReduces = failedReduces;
this.mapCounters = mapCounters;
this.reduceCounters = reduceCounters;
this.totalCounters = totalCounters;
}
JobFinishedEvent() {}
public Object getDatum() {
if (datum == null) {
datum = new JobFinished();
datum.jobid = new Utf8(jobId.toString());
datum.finishTime = finishTime;
datum.finishedMaps = finishedMaps;
datum.finishedReduces = finishedReduces;
datum.failedMaps = failedMaps;
datum.failedReduces = failedReduces;
datum.mapCounters =
EventWriter.toAvro(mapCounters, "MAP_COUNTERS");
datum.reduceCounters =
EventWriter.toAvro(reduceCounters, "REDUCE_COUNTERS");
datum.totalCounters =
EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS");
datum.mapCounters = EventWriter.toAvro(mapCounters, "MAP_COUNTERS");
datum.reduceCounters = EventWriter.toAvro(reduceCounters,
"REDUCE_COUNTERS");
datum.totalCounters = EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS");
}
return datum;
}
JobFinishedEvent() {}
public void setDatum(Object oDatum) {
this.datum = (JobFinished) oDatum;
this.jobId = JobID.forName(datum.jobid.toString());
this.finishTime = datum.finishTime;
this.finishedMaps = datum.finishedMaps;
this.finishedReduces = datum.finishedReduces;
this.failedMaps = datum.failedMaps;
this.failedReduces = datum.failedReduces;
this.mapCounters = EventReader.fromAvro(datum.mapCounters);
this.reduceCounters = EventReader.fromAvro(datum.reduceCounters);
this.totalCounters = EventReader.fromAvro(datum.totalCounters);
}
public Object getDatum() { return datum; }
public void setDatum(Object datum) { this.datum = (JobFinished)datum; }
public EventType getEventType() {
return EventType.JOB_FINISHED;
}
/** Get the Job ID */
public JobID getJobid() { return JobID.forName(datum.jobid.toString()); }
public JobID getJobid() { return jobId; }
/** Get the job finish time */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() { return finishTime; }
/** Get the number of finished maps for the job */
public int getFinishedMaps() { return datum.finishedMaps; }
public int getFinishedMaps() { return finishedMaps; }
/** Get the number of finished reducers for the job */
public int getFinishedReduces() { return datum.finishedReduces; }
public int getFinishedReduces() { return finishedReduces; }
/** Get the number of failed maps for the job */
public int getFailedMaps() { return datum.failedMaps; }
public int getFailedMaps() { return failedMaps; }
/** Get the number of failed reducers for the job */
public int getFailedReduces() { return datum.failedReduces; }
public int getFailedReduces() { return failedReduces; }
/** Get the counters for the job */
public Counters getTotalCounters() {
return EventReader.fromAvro(datum.totalCounters);
return totalCounters;
}
/** Get the Map counters for the job */
public Counters getMapCounters() {
return EventReader.fromAvro(datum.mapCounters);
return mapCounters;
}
/** Get the reduce counters for the job */
public Counters getReduceCounters() {
return EventReader.fromAvro(datum.reduceCounters);
return reduceCounters;
}
}

View File

@ -34,7 +34,24 @@ import org.apache.hadoop.mapreduce.TaskType;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MapAttemptFinishedEvent implements HistoryEvent {
private MapAttemptFinished datum = new MapAttemptFinished();
private MapAttemptFinished datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String taskStatus;
private long finishTime;
private String hostname;
private String rackName;
private int port;
private long mapFinishTime;
private String state;
private Counters counters;
int[][] allSplits;
int[] clockSplits;
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
/**
* Create an event for successful completion of map attempts
@ -60,33 +77,21 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname, int port,
String rackName, String state, Counters counters, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.mapFinishTime = mapFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
// This is needed for reading old jh files
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
datum.cpuUsages
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetCPUTime(allSplits));
datum.vMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
this.mapFinishTime = mapFinishTime;
this.finishTime = finishTime;
this.hostname = hostname;
this.rackName = rackName;
this.port = port;
this.state = state;
this.counters = counters;
this.allSplits = allSplits;
this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
}
/**
@ -117,57 +122,100 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
MapAttemptFinishedEvent() {}
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
this.datum = (MapAttemptFinished)datum;
public Object getDatum() {
if (datum == null) {
datum = new MapAttemptFinished();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.attemptId = new Utf8(attemptId.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.mapFinishTime = mapFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetWallclockTime(allSplits));
datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetCPUTime(allSplits));
datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetPhysMemKbytes(allSplits));
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (MapAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.mapFinishTime = datum.mapFinishTime;
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages);
this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes);
}
/** Get the task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */
public TaskAttemptID getAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString());
return attemptId;
}
/** Get the task type */
public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString());
return TaskType.valueOf(taskType.toString());
}
/** Get the task status */
public String getTaskStatus() { return datum.taskStatus.toString(); }
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the map phase finish time */
public long getMapFinishTime() { return datum.mapFinishTime; }
public long getMapFinishTime() { return mapFinishTime; }
/** Get the attempt finish time */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() { return finishTime; }
/** Get the host name */
public String getHostname() { return datum.hostname.toString(); }
public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */
public int getPort() { return datum.port; }
public int getPort() { return port; }
/** Get the rack name */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return datum.state.toString(); }
public String getState() { return state.toString(); }
/** Get the counters */
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
Counters getCounters() { return counters; }
/** Get the event type */
public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED;
}
public int[] getClockSplits() {
return AvroArrayUtils.fromAvro(datum.clockSplits);
return clockSplits;
}
public int[] getCpuUsages() {
return AvroArrayUtils.fromAvro(datum.cpuUsages);
return cpuUsages;
}
public int[] getVMemKbytes() {
return AvroArrayUtils.fromAvro(datum.vMemKbytes);
return vMemKbytes;
}
public int[] getPhysMemKbytes() {
return AvroArrayUtils.fromAvro(datum.physMemKbytes);
return physMemKbytes;
}
}

View File

@ -34,8 +34,25 @@ import org.apache.hadoop.mapreduce.TaskType;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ReduceAttemptFinishedEvent implements HistoryEvent {
private ReduceAttemptFinished datum =
new ReduceAttemptFinished();
private ReduceAttemptFinished datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String taskStatus;
private long shuffleFinishTime;
private long sortFinishTime;
private long finishTime;
private String hostname;
private String rackName;
private int port;
private String state;
private Counters counters;
int[][] allSplits;
int[] clockSplits;
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
/**
* Create an event to record completion of a reduce attempt
@ -60,33 +77,22 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, int port, String rackName, String state,
Counters counters, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.shuffleFinishTime = shuffleFinishTime;
datum.sortFinishTime = sortFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
datum.cpuUsages
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetCPUTime(allSplits));
datum.vMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
this.shuffleFinishTime = shuffleFinishTime;
this.sortFinishTime = sortFinishTime;
this.finishTime = finishTime;
this.hostname = hostname;
this.rackName = rackName;
this.port = port;
this.state = state;
this.counters = counters;
this.allSplits = allSplits;
this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
}
/**
@ -117,43 +123,87 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
ReduceAttemptFinishedEvent() {}
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
this.datum = (ReduceAttemptFinished)datum;
public Object getDatum() {
if (datum == null) {
datum = new ReduceAttemptFinished();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.attemptId = new Utf8(attemptId.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.shuffleFinishTime = shuffleFinishTime;
datum.sortFinishTime = sortFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetWallclockTime(allSplits));
datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetCPUTime(allSplits));
datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetPhysMemKbytes(allSplits));
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (ReduceAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.shuffleFinishTime = datum.shuffleFinishTime;
this.sortFinishTime = datum.sortFinishTime;
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages);
this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes);
}
/** Get the Task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */
public TaskAttemptID getAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString());
return TaskAttemptID.forName(attemptId.toString());
}
/** Get the task type */
public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString());
return TaskType.valueOf(taskType.toString());
}
/** Get the task status */
public String getTaskStatus() { return datum.taskStatus.toString(); }
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the finish time of the sort phase */
public long getSortFinishTime() { return datum.sortFinishTime; }
public long getSortFinishTime() { return sortFinishTime; }
/** Get the finish time of the shuffle phase */
public long getShuffleFinishTime() { return datum.shuffleFinishTime; }
public long getShuffleFinishTime() { return shuffleFinishTime; }
/** Get the finish time of the attempt */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt ran */
public String getHostname() { return datum.hostname.toString(); }
public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */
public int getPort() { return datum.port; }
public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return datum.state.toString(); }
public String getState() { return state.toString(); }
/** Get the counters for the attempt */
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
Counters getCounters() { return counters; }
/** Get the event type */
public EventType getEventType() {
return EventType.REDUCE_ATTEMPT_FINISHED;
@ -161,16 +211,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
public int[] getClockSplits() {
return AvroArrayUtils.fromAvro(datum.clockSplits);
return clockSplits;
}
public int[] getCpuUsages() {
return AvroArrayUtils.fromAvro(datum.cpuUsages);
return cpuUsages;
}
public int[] getVMemKbytes() {
return AvroArrayUtils.fromAvro(datum.vMemKbytes);
return vMemKbytes;
}
public int[] getPhysMemKbytes() {
return AvroArrayUtils.fromAvro(datum.physMemKbytes);
return physMemKbytes;
}
}

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
@ -27,8 +26,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.avro.util.Utf8;
/**
* Event to record successful task completion
*
@ -36,7 +33,17 @@ import org.apache.avro.util.Utf8;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TaskAttemptFinishedEvent implements HistoryEvent {
private TaskAttemptFinished datum = new TaskAttemptFinished();
private TaskAttemptFinished datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String taskStatus;
private long finishTime;
private String rackName;
private String hostname;
private String state;
private Counters counters;
/**
* Create an event to record successful finishes for setup and cleanup
@ -53,8 +60,23 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
TaskType taskType, String taskStatus,
long finishTime, String rackName,
String hostname, String state, Counters counters) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
this.finishTime = finishTime;
this.rackName = rackName;
this.hostname = hostname;
this.state = state;
this.counters = counters;
}
TaskAttemptFinishedEvent() {}
public Object getDatum() {
if (datum == null) {
datum = new TaskAttemptFinished();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.attemptId = new Utf8(attemptId.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.finishTime = finishTime;
@ -65,40 +87,46 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
}
TaskAttemptFinishedEvent() {}
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
this.datum = (TaskAttemptFinished)datum;
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (TaskAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.finishTime = datum.finishTime;
this.rackName = datum.rackname.toString();
this.hostname = datum.hostname.toString();
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
}
/** Get the task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the task attempt id */
public TaskAttemptID getAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString());
return TaskAttemptID.forName(attemptId.toString());
}
/** Get the task type */
public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString());
return TaskType.valueOf(taskType.toString());
}
/** Get the task status */
public String getTaskStatus() { return datum.taskStatus.toString(); }
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the attempt finish time */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() { return finishTime; }
/** Get the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); }
public String getHostname() { return hostname.toString(); }
/** Get the rackname where the attempt executed */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return datum.state.toString(); }
public String getState() { return state.toString(); }
/** Get the counters for the attempt */
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
Counters getCounters() { return counters; }
/** Get the event type */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the

View File

@ -18,16 +18,13 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.avro.util.Utf8;
/**
* Event to record the successful completion of a task
*
@ -35,7 +32,14 @@ import org.apache.avro.util.Utf8;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TaskFinishedEvent implements HistoryEvent {
private TaskFinished datum = new TaskFinished();
private TaskFinished datum = null;
private TaskID taskid;
private long finishTime;
private TaskType taskType;
private String status;
private Counters counters;
/**
* Create an event to record the successful completion of a task
@ -48,32 +52,48 @@ public class TaskFinishedEvent implements HistoryEvent {
public TaskFinishedEvent(TaskID id, long finishTime,
TaskType taskType,
String status, Counters counters) {
datum.taskid = new Utf8(id.toString());
this.taskid = id;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
this.counters = counters;
}
TaskFinishedEvent() {}
public Object getDatum() {
if (datum == null) {
datum = new TaskFinished();
datum.taskid = new Utf8(taskid.toString());
datum.finishTime = finishTime;
datum.counters = EventWriter.toAvro(counters);
datum.taskType = new Utf8(taskType.name());
datum.status = new Utf8(status);
}
return datum;
}
TaskFinishedEvent() {}
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
this.datum = (TaskFinished)datum;
public void setDatum(Object oDatum) {
this.datum = (TaskFinished)oDatum;
this.taskid = TaskID.forName(datum.taskid.toString());
this.finishTime = datum.finishTime;
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.status = datum.status.toString();
this.counters = EventReader.fromAvro(datum.counters);
}
/** Get task id */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
public TaskID getTaskId() { return TaskID.forName(taskid.toString()); }
/** Get the task finish time */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() { return finishTime; }
/** Get task counters */
public Counters getCounters() { return EventReader.fromAvro(datum.counters); }
public Counters getCounters() { return counters; }
/** Get task type */
public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString());
return TaskType.valueOf(taskType.toString());
}
/** Get task status */
public String getTaskStatus() { return datum.status.toString(); }
public String getTaskStatus() { return status.toString(); }
/** Get event type */
public EventType getEventType() {
return EventType.TASK_FINISHED;

View File

@ -32,13 +32,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -89,7 +89,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
loadFullHistoryData(loadTasks, historyFile);
user = userName;
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
counters = jobInfo.getTotalCounters();
diagnostics.add(jobInfo.getErrorInfo());
report =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
@ -121,7 +121,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
}
@Override
public Counters getCounters() {
public Counters getAllCounters() {
return counters;
}

View File

@ -24,10 +24,10 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
@ -60,7 +60,7 @@ public class CompletedTask implements Task {
this.finishTime = taskInfo.getFinishTime();
this.type = TypeConverter.toYarn(taskInfo.getTaskType());
if (taskInfo.getCounters() != null)
this.counters = TypeConverter.toYarn(taskInfo.getCounters());
this.counters = taskInfo.getCounters();
if (taskInfo.getTaskStatus() != null) {
this.state = TaskState.valueOf(taskInfo.getTaskStatus());
} else {
@ -86,7 +86,7 @@ public class CompletedTask implements Task {
report.setFinishTime(finishTime);
report.setTaskState(state);
report.setProgress(getProgress());
report.setCounters(getCounters());
report.setCounters(TypeConverter.toYarn(getCounters()));
report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
}

View File

@ -21,9 +21,9 @@ package org.apache.hadoop.mapreduce.v2.hs;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -46,8 +46,9 @@ public class CompletedTaskAttempt implements TaskAttempt {
CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
this.attemptInfo = attemptInfo;
this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
if (attemptInfo.getCounters() != null)
this.counters = TypeConverter.toYarn(attemptInfo.getCounters());
if (attemptInfo.getCounters() != null) {
this.counters = attemptInfo.getCounters();
}
if (attemptInfo.getTaskStatus() != null) {
this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus());
} else {
@ -61,7 +62,6 @@ public class CompletedTaskAttempt implements TaskAttempt {
}
report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class);
report.setCounters(counters);
report.setTaskAttemptId(attemptId);
report.setTaskAttemptState(state);
@ -78,7 +78,7 @@ public class CompletedTaskAttempt implements TaskAttempt {
}
// report.setPhase(attemptInfo.get); //TODO
report.setStateString(attemptInfo.getState());
report.setCounters(getCounters());
report.setCounters(TypeConverter.toYarn(getCounters()));
report.setContainerId(attemptInfo.getContainerId());
if (attemptInfo.getHostname() == null) {
report.setNodeManagerHost("UNKNOWN");

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@ -190,7 +191,7 @@ public class HistoryClientService extends AbstractService {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId);
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(job.getCounters());
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
return response;
}

View File

@ -22,9 +22,9 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -95,7 +95,7 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
}
@Override
public Counters getCounters() {
public Counters getAllCounters() {
return null;
}

View File

@ -699,7 +699,7 @@ public class TestHsWebServicesAttempts extends JerseyTest {
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i);
JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("name not set",
(counterName != null && !counterName.isEmpty()));

View File

@ -788,7 +788,7 @@ public class TestHsWebServicesTasks extends JerseyTest {
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i);
JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("name not set",
(counterName != null && !counterName.isEmpty()));