MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM thereby reducing the AM heap size and preventing full GCs. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-09 21:04:58 +00:00
parent a7195bdd14
commit e8645636ce
37 changed files with 584 additions and 405 deletions

View File

@ -174,6 +174,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3569. TaskAttemptListener holds a global lock for all MAPREDUCE-3569. TaskAttemptListener holds a global lock for all
task-updates. (Vinod Kumar Vavilapalli via sseth) task-updates. (Vinod Kumar Vavilapalli via sseth)
MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM
thereby reducing the AM heap size and preventing full GCs. (vinodkv)
BUG FIXES BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -325,9 +325,11 @@ public boolean statusUpdate(TaskAttemptID taskAttemptID,
taskAttemptStatus.outputSize = taskStatus.getOutputSize(); taskAttemptStatus.outputSize = taskStatus.getOutputSize();
// Task sends the updated phase to the TT. // Task sends the updated phase to the TT.
taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase()); taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase());
// Counters are updated by the task. // Counters are updated by the task. Convert counters into new format as
taskAttemptStatus.counters = // that is the primary storage format inside the AM to avoid multiple
TypeConverter.toYarn(taskStatus.getCounters()); // conversions and unnecessary heap usage.
taskAttemptStatus.counters = new org.apache.hadoop.mapreduce.Counters(
taskStatus.getCounters());
// Map Finish time set by the task (map only) // Map Finish time set by the task (map only)
if (taskStatus.getIsMap() && taskStatus.getMapFinishTime() != 0) { if (taskStatus.getIsMap() && taskStatus.getMapFinishTime() != 0) {

View File

@ -36,10 +36,11 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -63,6 +64,8 @@ public class JobHistoryEventHandler extends AbstractService
private final AppContext context; private final AppContext context;
private final int startCount; private final int startCount;
private int eventCounter;
//TODO Does the FS object need to be different ? //TODO Does the FS object need to be different ?
private FileSystem stagingDirFS; // log Dir FileSystem private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem private FileSystem doneDirFS; // done Dir FileSystem
@ -210,6 +213,16 @@ public void start() {
public void run() { public void run() {
JobHistoryEvent event = null; JobHistoryEvent event = null;
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
// Log the size of the history-event-queue every so often.
if (eventCounter % 1000 == 0) {
eventCounter = 0;
LOG.info("Size of the JobHistory event queue is "
+ eventQueue.size());
} else {
eventCounter++;
}
try { try {
event = eventQueue.take(); event = eventQueue.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -238,7 +251,8 @@ public void run() {
@Override @Override
public void stop() { public void stop() {
LOG.info("Stopping JobHistoryEventHandler"); LOG.info("Stopping JobHistoryEventHandler. "
+ "Size of the outstanding queue size is " + eventQueue.size());
stopped = true; stopped = true;
//do not interrupt while event handling is in progress //do not interrupt while event handling is in progress
synchronized(lock) { synchronized(lock) {
@ -483,7 +497,7 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
.toString()); .toString());
// TODO JOB_FINISHED does not have state. Effectively job history does not // TODO JOB_FINISHED does not have state. Effectively job history does not
// have state about the finished job. // have state about the finished job.
setSummarySlotSeconds(summary, jobId); setSummarySlotSeconds(summary, jfe.getTotalCounters());
break; break;
case JOB_FAILED: case JOB_FAILED:
case JOB_KILLED: case JOB_KILLED:
@ -492,21 +506,21 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps()); summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces()); summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setJobFinishTime(juce.getFinishTime()); summary.setJobFinishTime(juce.getFinishTime());
setSummarySlotSeconds(summary, jobId); setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break; break;
} }
} }
private void setSummarySlotSeconds(JobSummary summary, JobId jobId) { private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter =
context.getJob(jobId).getCounters() Counter slotMillisMapCounter = allCounters
.getCounter(JobCounter.SLOTS_MILLIS_MAPS); .findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) { if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue()); summary.setMapSlotSeconds(slotMillisMapCounter.getValue());
} }
Counter slotMillisReduceCounter =
context.getJob(jobId).getCounters() Counter slotMillisReduceCounter = allCounters
.getCounter(JobCounter.SLOTS_MILLIS_REDUCES); .findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) { if (slotMillisReduceCounter != null) {
summary.setMapSlotSeconds(slotMillisReduceCounter.getValue()); summary.setMapSlotSeconds(slotMillisReduceCounter.getValue());
} }

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@ -223,7 +224,7 @@ public GetCountersResponse getCounters(GetCountersRequest request)
Job job = verifyAndGetJob(jobId, false); Job job = verifyAndGetJob(jobId, false);
GetCountersResponse response = GetCountersResponse response =
recordFactory.newRecordInstance(GetCountersResponse.class); recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(job.getCounters()); response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
return response; return response;
} }
@ -238,7 +239,6 @@ public GetJobReportResponse getJobReport(GetJobReportRequest request)
return response; return response;
} }
@Override @Override
public GetTaskAttemptReportResponse getTaskAttemptReport( public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException { GetTaskAttemptReportRequest request) throws YarnRemoteException {
@ -356,6 +356,8 @@ public FailTaskAttemptResponse failTaskAttempt(
return response; return response;
} }
private final Object getTaskReportsLock = new Object();
@Override @Override
public GetTaskReportsResponse getTaskReports( public GetTaskReportsResponse getTaskReports(
GetTaskReportsRequest request) throws YarnRemoteException { GetTaskReportsRequest request) throws YarnRemoteException {
@ -366,12 +368,18 @@ public GetTaskReportsResponse getTaskReports(
recordFactory.newRecordInstance(GetTaskReportsResponse.class); recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, false); Job job = verifyAndGetJob(jobId, false);
LOG.info("Getting task report for " + taskType + " " + jobId);
Collection<Task> tasks = job.getTasks(taskType).values(); Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report size " + tasks.size()); LOG.info("Getting task report for " + taskType + " " + jobId
for (Task task : tasks) { + ". Report-size will be " + tasks.size());
response.addTaskReport(task.getReport());
} // Take lock to allow only one call, otherwise heap will blow up because
// of counters in the report when there are multiple callers.
synchronized (getTaskReportsLock) {
for (Task task : tasks) {
response.addTaskReport(task.getReport());
}
}
return response; return response;
} }
} }

View File

@ -22,9 +22,9 @@
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -44,7 +44,15 @@ public interface Job {
String getName(); String getName();
JobState getState(); JobState getState();
JobReport getReport(); JobReport getReport();
Counters getCounters();
/**
* Get all the counters of this job. This includes job-counters aggregated
* together with the counters of each task. This creates a clone of the
* Counters, so use this judiciously.
* @return job-counters and aggregate task-counters
*/
Counters getAllCounters();
Map<TaskId,Task> getTasks(); Map<TaskId,Task> getTasks();
Map<TaskId,Task> getTasks(TaskType taskType); Map<TaskId,Task> getTasks(TaskType taskType);
Task getTask(TaskId taskID); Task getTask(TaskId taskID);

View File

@ -20,7 +20,7 @@
import java.util.Map; import java.util.Map;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;

View File

@ -20,7 +20,7 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;

View File

@ -20,12 +20,11 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private TaskAttemptStatus reportedTaskAttemptStatus; private TaskAttemptStatus reportedTaskAttemptStatus;

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -61,9 +62,6 @@
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -99,7 +97,6 @@
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;
@ -109,10 +106,13 @@
/** Implementation of Job interface. Maintains the state machines of Job. /** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency. * The read and write calls use ReadWriteLock for concurrency.
*/ */
@SuppressWarnings({ "rawtypes", "deprecation" }) @SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> { EventHandler<JobEvent> {
private static final TaskAttemptCompletionEvent[]
EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
private static final Log LOG = LogFactory.getLog(JobImpl.class); private static final Log LOG = LogFactory.getLog(JobImpl.class);
//The maximum fraction of fetch failures allowed for a map //The maximum fraction of fetch failures allowed for a map
@ -152,7 +152,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private boolean lazyTasksCopyNeeded = false; private boolean lazyTasksCopyNeeded = false;
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>(); volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = newCounters(); private Counters jobCounters = new Counters();
// FIXME: // FIXME:
// //
// Can then replace task-level uber counters (MR-2424) with job-level ones // Can then replace task-level uber counters (MR-2424) with job-level ones
@ -475,88 +475,29 @@ public boolean isUber() {
} }
@Override @Override
public Counters getCounters() { public Counters getAllCounters() {
Counters counters = newCounters(); Counters counters = new Counters();
readLock.lock(); readLock.lock();
try { try {
incrAllCounters(counters, jobCounters); counters.incrAllCounters(jobCounters);
return incrTaskCounters(counters, tasks.values()); return incrTaskCounters(counters, tasks.values());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
private Counters getTypeCounters(Set<TaskId> taskIds) { public static Counters incrTaskCounters(
Counters counters = newCounters(); Counters counters, Collection<Task> tasks) {
for (TaskId taskId : taskIds) {
Task task = tasks.get(taskId);
incrAllCounters(counters, task.getCounters());
}
return counters;
}
private Counters getMapCounters() {
readLock.lock();
try {
return getTypeCounters(mapTasks);
} finally {
readLock.unlock();
}
}
private Counters getReduceCounters() {
readLock.lock();
try {
return getTypeCounters(reduceTasks);
} finally {
readLock.unlock();
}
}
public static Counters newCounters() {
Counters counters = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(Counters.class);
return counters;
}
public static Counters incrTaskCounters(Counters counters,
Collection<Task> tasks) {
for (Task task : tasks) { for (Task task : tasks) {
incrAllCounters(counters, task.getCounters()); counters.incrAllCounters(task.getCounters());
} }
return counters; return counters;
} }
public static void incrAllCounters(Counters counters, Counters other) {
if (other != null) {
for (CounterGroup otherGroup: other.getAllCounterGroups().values()) {
CounterGroup group = counters.getCounterGroup(otherGroup.getName());
if (group == null) {
group = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(CounterGroup.class);
group.setName(otherGroup.getName());
counters.setCounterGroup(group.getName(), group);
}
group.setDisplayName(otherGroup.getDisplayName());
for (Counter otherCounter : otherGroup.getAllCounters().values()) {
Counter counter = group.getCounter(otherCounter.getName());
if (counter == null) {
counter = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(Counter.class);
counter.setName(otherCounter.getName());
group.setCounter(counter.getName(), counter);
}
counter.setDisplayName(otherCounter.getDisplayName());
counter.setValue(counter.getValue() + otherCounter.getValue());
}
}
}
}
@Override @Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents( public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) { int fromEventId, int maxEvents) {
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0]; TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock(); readLock.lock();
try { try {
if (taskAttemptCompletionEvents.size() > fromEventId) { if (taskAttemptCompletionEvents.size() > fromEventId) {
@ -1204,13 +1145,24 @@ private void abortJob(
// area. May need to create a new event type for this if JobFinished should // area. May need to create a new event type for this if JobFinished should
// not be generated for KilledJobs, etc. // not be generated for KilledJobs, etc.
private static JobFinishedEvent createJobFinishedEvent(JobImpl job) { private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
Counters mapCounters = new Counters();
Counters reduceCounters = new Counters();
for (Task t : job.tasks.values()) {
Counters counters = t.getCounters();
switch (t.getType()) {
case MAP: mapCounters.incrAllCounters(counters); break;
case REDUCE: reduceCounters.incrAllCounters(counters); break;
}
}
JobFinishedEvent jfe = new JobFinishedEvent( JobFinishedEvent jfe = new JobFinishedEvent(
job.oldJobId, job.finishTime, job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount, job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount, job.failedMapTaskCount, job.failedReduceTaskCount,
TypeConverter.fromYarn(job.getMapCounters()), mapCounters,
TypeConverter.fromYarn(job.getReduceCounters()), reduceCounters,
TypeConverter.fromYarn(job.getCounters())); job.getAllCounters());
return jfe; return jfe;
} }
@ -1450,7 +1402,8 @@ public void transition(JobImpl job, JobEvent event) {
JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event; JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
.getCounterUpdates()) { .getCounterUpdates()) {
job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue()); job.jobCounters.findCounter(ci.getCounterKey()).increment(
ci.getIncrementValue());
} }
} }
} }

View File

@ -47,6 +47,8 @@
import org.apache.hadoop.mapred.TaskAttemptContextImpl; import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapred.WrappedProgressSplitsBlock; import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
@ -60,8 +62,6 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@ -132,6 +132,7 @@ public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> { EventHandler<TaskAttemptEvent> {
static final Counters EMPTY_COUNTERS = new Counters();
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class); private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable? private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
private static final int MAP_MEMORY_MB_DEFAULT = 1024; private static final int MAP_MEMORY_MB_DEFAULT = 1024;
@ -846,7 +847,7 @@ public TaskAttemptReport getReport() {
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics())); result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase); result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString); result.setStateString(reportedStatus.stateString);
result.setCounters(getCounters()); result.setCounters(TypeConverter.toYarn(getCounters()));
result.setContainerId(this.getAssignedContainerID()); result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName); result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort); result.setNodeManagerHttpPort(httpPort);
@ -877,7 +878,7 @@ public Counters getCounters() {
try { try {
Counters counters = reportedStatus.counters; Counters counters = reportedStatus.counters;
if (counters == null) { if (counters == null) {
counters = recordFactory.newRecordInstance(Counters.class); counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>(); // counters.groups = new HashMap<String, CounterGroup>();
} }
return counters; return counters;
@ -1031,22 +1032,21 @@ private void updateProgressSplits() {
(int) (now - start)); (int) (now - start));
} }
Counter cpuCounter = counters.getCounter( Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) { if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress, splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
} }
Counter virtualBytes = counters.getCounter( Counter virtualBytes = counters
TaskCounter.VIRTUAL_MEMORY_BYTES); .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) { if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress, splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION))); (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
} }
Counter physicalBytes = counters.getCounter( Counter physicalBytes = counters
TaskCounter.PHYSICAL_MEMORY_BYTES); .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) { if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress, splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION))); (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
@ -1343,7 +1343,7 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(), this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString, this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()), getCounters(),
getProgressSplitBlock().burst()); getProgressSplitBlock().burst());
eventHandler.handle( eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
@ -1360,7 +1360,7 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(), this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString, this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()), getCounters(),
getProgressSplitBlock().burst()); getProgressSplitBlock().burst());
eventHandler.handle( eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
@ -1498,8 +1498,8 @@ private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.phase = Phase.STARTING; result.phase = Phase.STARTING;
result.stateString = "NEW"; result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW; result.taskState = TaskAttemptState.NEW;
Counters counters = recordFactory.newRecordInstance(Counters.class); Counters counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>(); // counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters; result.counters = counters;
} }

View File

@ -33,6 +33,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -40,7 +41,6 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@ -329,7 +329,6 @@ public TaskReport getReport() {
report.setFinishTime(getFinishTime()); report.setFinishTime(getFinishTime());
report.setTaskState(getState()); report.setTaskState(getState());
report.setProgress(getProgress()); report.setProgress(getProgress());
report.setCounters(getCounters());
for (TaskAttempt attempt : attempts.values()) { for (TaskAttempt attempt : attempts.values()) {
if (TaskAttemptState.RUNNING.equals(attempt.getState())) { if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
@ -346,6 +345,11 @@ public TaskReport getReport() {
} }
} }
// Add a copy of counters as the last step so that their lifetime on heap
// is as small as possible.
report.setCounters(TypeConverter.toYarn(getCounters()));
return report; return report;
} finally { } finally {
readLock.unlock(); readLock.unlock();
@ -361,7 +365,7 @@ public Counters getCounters() {
if (bestAttempt != null) { if (bestAttempt != null) {
counters = bestAttempt.getCounters(); counters = bestAttempt.getCounters();
} else { } else {
counters = recordFactory.newRecordInstance(Counters.class); counters = TaskAttemptImpl.EMPTY_COUNTERS;
// counters.groups = new HashMap<CharSequence, CounterGroup>(); // counters.groups = new HashMap<CharSequence, CounterGroup>();
} }
return counters; return counters;
@ -595,7 +599,7 @@ private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStat
task.getFinishTime(task.successfulAttempt), task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()), TypeConverter.fromYarn(task.taskId.getTaskType()),
taskState.toString(), taskState.toString(),
TypeConverter.fromYarn(task.getCounters())); task.getCounters());
return tfe; return tfe;
} }

View File

@ -92,7 +92,6 @@
//TODO: //TODO:
//task cleanup for all non completed tasks //task cleanup for all non completed tasks
public class RecoveryService extends CompositeService implements Recovery { public class RecoveryService extends CompositeService implements Recovery {
private static final Log LOG = LogFactory.getLog(RecoveryService.class); private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@ -411,8 +410,7 @@ private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
if (cntrs == null) { if (cntrs == null) {
taskAttemptStatus.counters = null; taskAttemptStatus.counters = null;
} else { } else {
taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo taskAttemptStatus.counters = cntrs;
.getCounters());
} }
actualHandler.handle(new TaskAttemptStatusUpdateEvent( actualHandler.handle(new TaskAttemptStatusUpdateEvent(
taskAttemptStatus.id, taskAttemptStatus)); taskAttemptStatus.id, taskAttemptStatus));

View File

@ -18,25 +18,32 @@
package org.apache.hadoop.mapreduce.v2.app.webapp; package org.apache.hadoop.mapreduce.v2.app.webapp;
import com.google.inject.Inject; import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_TABLE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*; import com.google.inject.Inject;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
public class CountersBlock extends HtmlBlock { public class CountersBlock extends HtmlBlock {
Job job; Job job;
@ -62,8 +69,7 @@ public class CountersBlock extends HtmlBlock {
return; return;
} }
if(total == null || total.getAllCounterGroups() == null || if(total == null || total.getGroupNames() == null) {
total.getAllCounterGroups().size() <= 0) {
String type = $(TASK_ID); String type = $(TASK_ID);
if(type == null || type.isEmpty()) { if(type == null || type.isEmpty()) {
type = $(JOB_ID, "the job"); type = $(JOB_ID, "the job");
@ -93,9 +99,9 @@ public class CountersBlock extends HtmlBlock {
th(".group.ui-state-default", "Counter Group"). th(".group.ui-state-default", "Counter Group").
th(".ui-state-default", "Counters")._()._(). th(".ui-state-default", "Counters")._()._().
tbody(); tbody();
for (CounterGroup g : total.getAllCounterGroups().values()) { for (CounterGroup g : total) {
CounterGroup mg = map == null ? null : map.getCounterGroup(g.getName()); CounterGroup mg = map == null ? null : map.getGroup(g.getName());
CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g.getName()); CounterGroup rg = reduce == null ? null : reduce.getGroup(g.getName());
++numGroups; ++numGroups;
// This is mostly for demonstration :) Typically we'd introduced // This is mostly for demonstration :) Typically we'd introduced
// a CounterGroup block to reduce the verbosity. OTOH, this // a CounterGroup block to reduce the verbosity. OTOH, this
@ -116,7 +122,7 @@ public class CountersBlock extends HtmlBlock {
TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>> group = groupHeadRow. TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>> group = groupHeadRow.
th(map == null ? "Value" : "Total")._()._(). th(map == null ? "Value" : "Total")._()._().
tbody(); tbody();
for (Counter counter : g.getAllCounters().values()) { for (Counter counter : g) {
// Ditto // Ditto
TR<TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupRow = group. TR<TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupRow = group.
tr(); tr();
@ -130,8 +136,8 @@ public class CountersBlock extends HtmlBlock {
_(); _();
} }
if (map != null) { if (map != null) {
Counter mc = mg == null ? null : mg.getCounter(counter.getName()); Counter mc = mg == null ? null : mg.findCounter(counter.getName());
Counter rc = rg == null ? null : rg.getCounter(counter.getName()); Counter rc = rg == null ? null : rg.findCounter(counter.getName());
groupRow. groupRow.
td(mc == null ? "0" : String.valueOf(mc.getValue())). td(mc == null ? "0" : String.valueOf(mc.getValue())).
td(rc == null ? "0" : String.valueOf(rc.getValue())); td(rc == null ? "0" : String.valueOf(rc.getValue()));
@ -173,14 +179,14 @@ private void getCounters(AppContext ctx) {
} }
// Get all types of counters // Get all types of counters
Map<TaskId, Task> tasks = job.getTasks(); Map<TaskId, Task> tasks = job.getTasks();
total = job.getCounters(); total = job.getAllCounters();
map = JobImpl.newCounters(); map = new Counters();
reduce = JobImpl.newCounters(); reduce = new Counters();
for (Task t : tasks.values()) { for (Task t : tasks.values()) {
Counters counters = t.getCounters(); Counters counters = t.getCounters();
switch (t.getType()) { switch (t.getType()) {
case MAP: JobImpl.incrAllCounters(map, counters); break; case MAP: map.incrAllCounters(counters); break;
case REDUCE: JobImpl.incrAllCounters(reduce, counters); break; case REDUCE: reduce.incrAllCounters(counters); break;
} }
} }
} }

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.mapreduce.v2.app.webapp; package org.apache.hadoop.mapreduce.v2.app.webapp;
import com.google.inject.Inject; import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.COUNTER_GROUP;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.COUNTER_NAME;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -40,8 +45,7 @@
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*; import com.google.inject.Inject;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
public class SingleCounterBlock extends HtmlBlock { public class SingleCounterBlock extends HtmlBlock {
protected TreeMap<String, Long> values = new TreeMap<String, Long>(); protected TreeMap<String, Long> values = new TreeMap<String, Long>();
@ -122,10 +126,10 @@ private void populateMembers(AppContext ctx) {
task.getAttempts().entrySet()) { task.getAttempts().entrySet()) {
long value = 0; long value = 0;
Counters counters = entry.getValue().getCounters(); Counters counters = entry.getValue().getCounters();
CounterGroup group = (counters != null) CounterGroup group = (counters != null) ? counters
? counters.getCounterGroup($(COUNTER_GROUP)) : null; .getGroup($(COUNTER_GROUP)) : null;
if(group != null) { if(group != null) {
Counter c = group.getCounter($(COUNTER_NAME)); Counter c = group.findCounter($(COUNTER_NAME));
if(c != null) { if(c != null) {
value = c.getValue(); value = c.getValue();
} }
@ -140,9 +144,9 @@ private void populateMembers(AppContext ctx) {
for(Map.Entry<TaskId, Task> entry : tasks.entrySet()) { for(Map.Entry<TaskId, Task> entry : tasks.entrySet()) {
long value = 0; long value = 0;
CounterGroup group = entry.getValue().getCounters() CounterGroup group = entry.getValue().getCounters()
.getCounterGroup($(COUNTER_GROUP)); .getGroup($(COUNTER_GROUP));
if(group != null) { if(group != null) {
Counter c = group.getCounter($(COUNTER_NAME)); Counter c = group.findCounter($(COUNTER_NAME));
if(c != null) { if(c != null) {
value = c.getValue(); value = c.getValue();
} }

View File

@ -24,8 +24,8 @@
import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
@XmlRootElement(name = "counterGroup") @XmlRootElement(name = "counterGroup")
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -38,14 +38,14 @@ public class CounterGroupInfo {
public CounterGroupInfo() { public CounterGroupInfo() {
} }
public CounterGroupInfo(String name, CounterGroup g, CounterGroup mg, public CounterGroupInfo(String name, CounterGroup group, CounterGroup mg,
CounterGroup rg) { CounterGroup rg) {
this.counterGroupName = name; this.counterGroupName = name;
this.counter = new ArrayList<CounterInfo>(); this.counter = new ArrayList<CounterInfo>();
for (Counter c : g.getAllCounters().values()) { for (Counter c : group) {
Counter mc = mg == null ? null : mg.getCounter(c.getName()); Counter mc = mg == null ? null : mg.findCounter(c.getName());
Counter rc = rg == null ? null : rg.getCounter(c.getName()); Counter rc = rg == null ? null : rg.findCounter(c.getName());
CounterInfo cinfo = new CounterInfo(c, mc, rc); CounterInfo cinfo = new CounterInfo(c, mc, rc);
this.counter.add(cinfo); this.counter.add(cinfo);
} }

View File

@ -21,7 +21,7 @@
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.Counter;
@XmlRootElement @XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -35,9 +35,9 @@ public class CounterInfo {
public CounterInfo() { public CounterInfo() {
} }
public CounterInfo(Counter counter, Counter mc, Counter rc) { public CounterInfo(Counter c, Counter mc, Counter rc) {
this.name = counter.getName(); this.name = c.getName();
this.totalCounterValue = counter.getValue(); this.totalCounterValue = c.getValue();
this.mapCounterValue = mc == null ? 0 : mc.getValue(); this.mapCounterValue = mc == null ? 0 : mc.getValue();
this.reduceCounterValue = rc == null ? 0 : rc.getValue(); this.reduceCounterValue = rc == null ? 0 : rc.getValue();
} }

View File

@ -25,13 +25,12 @@
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
@XmlRootElement(name = "jobCounters") @XmlRootElement(name = "jobCounters")
@ -56,18 +55,15 @@ public JobCounterInfo(AppContext ctx, Job job) {
counterGroup = new ArrayList<CounterGroupInfo>(); counterGroup = new ArrayList<CounterGroupInfo>();
this.id = MRApps.toString(job.getID()); this.id = MRApps.toString(job.getID());
int numGroups = 0;
if (total != null) { if (total != null) {
for (CounterGroup g : total.getAllCounterGroups().values()) { for (CounterGroup g : total) {
if (g != null) { if (g != null) {
CounterGroup mg = map == null ? null : map.getCounterGroup(g CounterGroup mg = map == null ? null : map.getGroup(g.getName());
.getName()); CounterGroup rg = reduce == null ? null : reduce
CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g .getGroup(g.getName());
.getName());
++numGroups;
CounterGroupInfo cginfo = new CounterGroupInfo(g.getName(), g, mg, rg); CounterGroupInfo cginfo = new CounterGroupInfo(g.getName(), g,
mg, rg);
counterGroup.add(cginfo); counterGroup.add(cginfo);
} }
} }
@ -75,23 +71,23 @@ public JobCounterInfo(AppContext ctx, Job job) {
} }
private void getCounters(AppContext ctx, Job job) { private void getCounters(AppContext ctx, Job job) {
total = JobImpl.newCounters(); total = new Counters();
if (job == null) { if (job == null) {
return; return;
} }
map = JobImpl.newCounters(); map = new Counters();
reduce = JobImpl.newCounters(); reduce = new Counters();
// Get all types of counters // Get all types of counters
Map<TaskId, Task> tasks = job.getTasks(); Map<TaskId, Task> tasks = job.getTasks();
for (Task t : tasks.values()) { for (Task t : tasks.values()) {
Counters counters = t.getCounters(); Counters counters = t.getCounters();
JobImpl.incrAllCounters(total, counters); total.incrAllCounters(counters);
switch (t.getType()) { switch (t.getType()) {
case MAP: case MAP:
JobImpl.incrAllCounters(map, counters); map.incrAllCounters(counters);
break; break;
case REDUCE: case REDUCE:
JobImpl.incrAllCounters(reduce, counters); reduce.incrAllCounters(counters);
break; break;
} }
} }

View File

@ -25,8 +25,8 @@
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
@ -49,7 +49,7 @@ public JobTaskAttemptCounterInfo(TaskAttempt taskattempt) {
total = taskattempt.getCounters(); total = taskattempt.getCounters();
taskAttemptCounterGroup = new ArrayList<TaskCounterGroupInfo>(); taskAttemptCounterGroup = new ArrayList<TaskCounterGroupInfo>();
if (total != null) { if (total != null) {
for (CounterGroup g : total.getAllCounterGroups().values()) { for (CounterGroup g : total) {
if (g != null) { if (g != null) {
TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g); TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g);
if (cginfo != null) { if (cginfo != null) {

View File

@ -25,8 +25,8 @@
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
@ -48,7 +48,7 @@ public JobTaskCounterInfo(Task task) {
this.id = MRApps.toString(task.getID()); this.id = MRApps.toString(task.getID());
taskCounterGroup = new ArrayList<TaskCounterGroupInfo>(); taskCounterGroup = new ArrayList<TaskCounterGroupInfo>();
if (total != null) { if (total != null) {
for (CounterGroup g : total.getAllCounterGroups().values()) { for (CounterGroup g : total) {
if (g != null) { if (g != null) {
TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g); TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g);
taskCounterGroup.add(cginfo); taskCounterGroup.add(cginfo);

View File

@ -24,8 +24,8 @@
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
@XmlRootElement @XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -37,11 +37,11 @@ public class TaskCounterGroupInfo {
public TaskCounterGroupInfo() { public TaskCounterGroupInfo() {
} }
public TaskCounterGroupInfo(String name, CounterGroup g) { public TaskCounterGroupInfo(String name, CounterGroup group) {
this.counterGroupName = name; this.counterGroupName = name;
this.counter = new ArrayList<TaskCounterInfo>(); this.counter = new ArrayList<TaskCounterInfo>();
for (Counter c : g.getAllCounters().values()) { for (Counter c : group) {
TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue()); TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
this.counter.add(cinfo); this.counter.add(cinfo);
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
@ -37,7 +38,6 @@
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -144,7 +144,7 @@ public static TaskReport newTaskReport(TaskId id) {
report.setFinishTime(System.currentTimeMillis() report.setFinishTime(System.currentTimeMillis()
+ (int) (Math.random() * DT) + 1); + (int) (Math.random() * DT) + 1);
report.setProgress((float) Math.random()); report.setProgress((float) Math.random());
report.setCounters(newCounters()); report.setCounters(TypeConverter.toYarn(newCounters()));
report.setTaskState(TASK_STATES.next()); report.setTaskState(TASK_STATES.next());
return report; return report;
} }
@ -159,13 +159,12 @@ public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
report.setPhase(PHASES.next()); report.setPhase(PHASES.next());
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next()); report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
report.setProgress((float) Math.random()); report.setProgress((float) Math.random());
report.setCounters(newCounters()); report.setCounters(TypeConverter.toYarn(newCounters()));
return report; return report;
} }
@SuppressWarnings("deprecation")
public static Counters newCounters() { public static Counters newCounters() {
org.apache.hadoop.mapred.Counters hc = new org.apache.hadoop.mapred.Counters(); Counters hc = new Counters();
for (JobCounter c : JobCounter.values()) { for (JobCounter c : JobCounter.values()) {
hc.findCounter(c).setValue((long) (Math.random() * 1000)); hc.findCounter(c).setValue((long) (Math.random() * 1000));
} }
@ -183,7 +182,7 @@ public static Counters newCounters() {
hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next()) hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next())
.setValue((long) (Math.random() * 100000)); .setValue((long) (Math.random() * 100000));
} }
return TypeConverter.toYarn(hc); return hc;
} }
public static Map<TaskAttemptId, TaskAttempt> newTaskAttempts(TaskId tid, public static Map<TaskAttemptId, TaskAttempt> newTaskAttempts(TaskId tid,
@ -231,7 +230,10 @@ public int getShufflePort() {
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return report.getCounters(); if (report != null && report.getCounters() != null) {
return new Counters(TypeConverter.fromYarn(report.getCounters()));
}
return null;
} }
@Override @Override
@ -327,7 +329,8 @@ public TaskReport getReport() {
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return report.getCounters(); return new Counters(
TypeConverter.fromYarn(report.getCounters()));
} }
@Override @Override
@ -373,8 +376,9 @@ public TaskState getState() {
}; };
} }
public static Counters getCounters(Collection<Task> tasks) { public static Counters getCounters(
Counters counters = JobImpl.newCounters(); Collection<Task> tasks) {
Counters counters = new Counters();
return JobImpl.incrTaskCounters(counters, tasks); return JobImpl.incrTaskCounters(counters, tasks);
} }
@ -419,7 +423,8 @@ public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile
final JobReport report = newJobReport(id); final JobReport report = newJobReport(id);
final Map<TaskId, Task> tasks = newTasks(id, n, m); final Map<TaskId, Task> tasks = newTasks(id, n, m);
final TaskCount taskCount = getTaskCount(tasks.values()); final TaskCount taskCount = getTaskCount(tasks.values());
final Counters counters = getCounters(tasks.values()); final Counters counters = getCounters(tasks
.values());
final Path configFile = confFile; final Path configFile = confFile;
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>(); Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
@ -457,7 +462,7 @@ public float getProgress() {
} }
@Override @Override
public Counters getCounters() { public Counters getAllCounters() {
return counters; return counters;
} }

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -30,11 +27,14 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -46,13 +46,12 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator; import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
@ -74,7 +73,7 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRuntimeEstimators { public class TestRuntimeEstimators {
private static int INITIAL_NUMBER_FREE_SLOTS = 600; private static int INITIAL_NUMBER_FREE_SLOTS = 600;
@ -399,7 +398,7 @@ public float getProgress() {
} }
@Override @Override
public Counters getCounters() { public Counters getAllCounters() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }

View File

@ -686,7 +686,7 @@ public void verifyAMJobTaskAttemptCounters(JSONObject info, TaskAttempt att)
assertTrue("name not set", (name != null && !name.isEmpty())); assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter"); JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) { for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i); JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name"); String counterName = counter.getString("name");
assertTrue("name not set", assertTrue("name not set",
(counterName != null && !counterName.isEmpty())); (counterName != null && !counterName.isEmpty()));

View File

@ -774,7 +774,7 @@ public void verifyAMJobTaskCounters(JSONObject info, Task task)
assertTrue("name not set", (name != null && !name.isEmpty())); assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter"); JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) { for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i); JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name"); String counterName = counter.getString("name");
assertTrue("name not set", assertTrue("name not set",
(counterName != null && !counterName.isEmpty())); (counterName != null && !counterName.isEmpty()));

View File

@ -22,7 +22,6 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskCompletionEvent;
@ -45,14 +44,15 @@
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@SuppressWarnings("deprecation")
public class TypeConverter { public class TypeConverter {
private static RecordFactory recordFactory; private static RecordFactory recordFactory;

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.avro.util.Utf8;
/** /**
* Event to record successful completion of job * Event to record successful completion of job
* *
@ -34,7 +31,18 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class JobFinishedEvent implements HistoryEvent { public class JobFinishedEvent implements HistoryEvent {
private JobFinished datum = new JobFinished();
private JobFinished datum = null;
private JobID jobId;
private long finishTime;
private int finishedMaps;
private int finishedReduces;
private int failedMaps;
private int failedReduces;
private Counters mapCounters;
private Counters reduceCounters;
private Counters totalCounters;
/** /**
* Create an event to record successful job completion * Create an event to record successful job completion
@ -53,50 +61,75 @@ public JobFinishedEvent(JobID id, long finishTime,
int failedMaps, int failedReduces, int failedMaps, int failedReduces,
Counters mapCounters, Counters reduceCounters, Counters mapCounters, Counters reduceCounters,
Counters totalCounters) { Counters totalCounters) {
datum.jobid = new Utf8(id.toString()); this.jobId = id;
datum.finishTime = finishTime; this.finishTime = finishTime;
datum.finishedMaps = finishedMaps; this.finishedMaps = finishedMaps;
datum.finishedReduces = finishedReduces; this.finishedReduces = finishedReduces;
datum.failedMaps = failedMaps; this.failedMaps = failedMaps;
datum.failedReduces = failedReduces; this.failedReduces = failedReduces;
datum.mapCounters = this.mapCounters = mapCounters;
EventWriter.toAvro(mapCounters, "MAP_COUNTERS"); this.reduceCounters = reduceCounters;
datum.reduceCounters = this.totalCounters = totalCounters;
EventWriter.toAvro(reduceCounters, "REDUCE_COUNTERS");
datum.totalCounters =
EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS");
} }
JobFinishedEvent() {} JobFinishedEvent() {}
public Object getDatum() { return datum; } public Object getDatum() {
public void setDatum(Object datum) { this.datum = (JobFinished)datum; } if (datum == null) {
datum = new JobFinished();
datum.jobid = new Utf8(jobId.toString());
datum.finishTime = finishTime;
datum.finishedMaps = finishedMaps;
datum.finishedReduces = finishedReduces;
datum.failedMaps = failedMaps;
datum.failedReduces = failedReduces;
datum.mapCounters = EventWriter.toAvro(mapCounters, "MAP_COUNTERS");
datum.reduceCounters = EventWriter.toAvro(reduceCounters,
"REDUCE_COUNTERS");
datum.totalCounters = EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS");
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (JobFinished) oDatum;
this.jobId = JobID.forName(datum.jobid.toString());
this.finishTime = datum.finishTime;
this.finishedMaps = datum.finishedMaps;
this.finishedReduces = datum.finishedReduces;
this.failedMaps = datum.failedMaps;
this.failedReduces = datum.failedReduces;
this.mapCounters = EventReader.fromAvro(datum.mapCounters);
this.reduceCounters = EventReader.fromAvro(datum.reduceCounters);
this.totalCounters = EventReader.fromAvro(datum.totalCounters);
}
public EventType getEventType() { public EventType getEventType() {
return EventType.JOB_FINISHED; return EventType.JOB_FINISHED;
} }
/** Get the Job ID */ /** Get the Job ID */
public JobID getJobid() { return JobID.forName(datum.jobid.toString()); } public JobID getJobid() { return jobId; }
/** Get the job finish time */ /** Get the job finish time */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return finishTime; }
/** Get the number of finished maps for the job */ /** Get the number of finished maps for the job */
public int getFinishedMaps() { return datum.finishedMaps; } public int getFinishedMaps() { return finishedMaps; }
/** Get the number of finished reducers for the job */ /** Get the number of finished reducers for the job */
public int getFinishedReduces() { return datum.finishedReduces; } public int getFinishedReduces() { return finishedReduces; }
/** Get the number of failed maps for the job */ /** Get the number of failed maps for the job */
public int getFailedMaps() { return datum.failedMaps; } public int getFailedMaps() { return failedMaps; }
/** Get the number of failed reducers for the job */ /** Get the number of failed reducers for the job */
public int getFailedReduces() { return datum.failedReduces; } public int getFailedReduces() { return failedReduces; }
/** Get the counters for the job */ /** Get the counters for the job */
public Counters getTotalCounters() { public Counters getTotalCounters() {
return EventReader.fromAvro(datum.totalCounters); return totalCounters;
} }
/** Get the Map counters for the job */ /** Get the Map counters for the job */
public Counters getMapCounters() { public Counters getMapCounters() {
return EventReader.fromAvro(datum.mapCounters); return mapCounters;
} }
/** Get the reduce counters for the job */ /** Get the reduce counters for the job */
public Counters getReduceCounters() { public Counters getReduceCounters() {
return EventReader.fromAvro(datum.reduceCounters); return reduceCounters;
} }
} }

View File

@ -34,7 +34,24 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class MapAttemptFinishedEvent implements HistoryEvent { public class MapAttemptFinishedEvent implements HistoryEvent {
private MapAttemptFinished datum = new MapAttemptFinished();
private MapAttemptFinished datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String taskStatus;
private long finishTime;
private String hostname;
private String rackName;
private int port;
private long mapFinishTime;
private String state;
private Counters counters;
int[][] allSplits;
int[] clockSplits;
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
/** /**
* Create an event for successful completion of map attempts * Create an event for successful completion of map attempts
@ -60,33 +77,21 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType, String taskStatus, (TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname, int port, long mapFinishTime, long finishTime, String hostname, int port,
String rackName, String state, Counters counters, int[][] allSplits) { String rackName, String state, Counters counters, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString()); this.attemptId = id;
datum.attemptId = new Utf8(id.toString()); this.taskType = taskType;
datum.taskType = new Utf8(taskType.name()); this.taskStatus = taskStatus;
datum.taskStatus = new Utf8(taskStatus); this.mapFinishTime = mapFinishTime;
datum.mapFinishTime = mapFinishTime; this.finishTime = finishTime;
datum.finishTime = finishTime; this.hostname = hostname;
datum.hostname = new Utf8(hostname); this.rackName = rackName;
datum.port = port; this.port = port;
// This is needed for reading old jh files this.state = state;
if (rackName != null) { this.counters = counters;
datum.rackname = new Utf8(rackName); this.allSplits = allSplits;
} this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
datum.state = new Utf8(state); this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
datum.counters = EventWriter.toAvro(counters); this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
datum.clockSplits
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
datum.cpuUsages
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetCPUTime(allSplits));
datum.vMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
} }
/** /**
@ -117,57 +122,100 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
MapAttemptFinishedEvent() {} MapAttemptFinishedEvent() {}
public Object getDatum() { return datum; } public Object getDatum() {
public void setDatum(Object datum) { if (datum == null) {
this.datum = (MapAttemptFinished)datum; datum = new MapAttemptFinished();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.attemptId = new Utf8(attemptId.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.mapFinishTime = mapFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetWallclockTime(allSplits));
datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetCPUTime(allSplits));
datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetPhysMemKbytes(allSplits));
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (MapAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.mapFinishTime = datum.mapFinishTime;
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages);
this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes);
} }
/** Get the task ID */ /** Get the task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */ /** Get the attempt id */
public TaskAttemptID getAttemptId() { public TaskAttemptID getAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString()); return attemptId;
} }
/** Get the task type */ /** Get the task type */
public TaskType getTaskType() { public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString()); return TaskType.valueOf(taskType.toString());
} }
/** Get the task status */ /** Get the task status */
public String getTaskStatus() { return datum.taskStatus.toString(); } public String getTaskStatus() { return taskStatus.toString(); }
/** Get the map phase finish time */ /** Get the map phase finish time */
public long getMapFinishTime() { return datum.mapFinishTime; } public long getMapFinishTime() { return mapFinishTime; }
/** Get the attempt finish time */ /** Get the attempt finish time */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return finishTime; }
/** Get the host name */ /** Get the host name */
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */ /** Get the tracker rpc port */
public int getPort() { return datum.port; } public int getPort() { return port; }
/** Get the rack name */ /** Get the rack name */
public String getRackName() { public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString(); return rackName == null ? null : rackName.toString();
} }
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return state.toString(); }
/** Get the counters */ /** Get the counters */
Counters getCounters() { return EventReader.fromAvro(datum.counters); } Counters getCounters() { return counters; }
/** Get the event type */ /** Get the event type */
public EventType getEventType() { public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED; return EventType.MAP_ATTEMPT_FINISHED;
} }
public int[] getClockSplits() { public int[] getClockSplits() {
return AvroArrayUtils.fromAvro(datum.clockSplits); return clockSplits;
} }
public int[] getCpuUsages() { public int[] getCpuUsages() {
return AvroArrayUtils.fromAvro(datum.cpuUsages); return cpuUsages;
} }
public int[] getVMemKbytes() { public int[] getVMemKbytes() {
return AvroArrayUtils.fromAvro(datum.vMemKbytes); return vMemKbytes;
} }
public int[] getPhysMemKbytes() { public int[] getPhysMemKbytes() {
return AvroArrayUtils.fromAvro(datum.physMemKbytes); return physMemKbytes;
} }
} }

View File

@ -34,8 +34,25 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class ReduceAttemptFinishedEvent implements HistoryEvent { public class ReduceAttemptFinishedEvent implements HistoryEvent {
private ReduceAttemptFinished datum =
new ReduceAttemptFinished(); private ReduceAttemptFinished datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String taskStatus;
private long shuffleFinishTime;
private long sortFinishTime;
private long finishTime;
private String hostname;
private String rackName;
private int port;
private String state;
private Counters counters;
int[][] allSplits;
int[] clockSplits;
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
/** /**
* Create an event to record completion of a reduce attempt * Create an event to record completion of a reduce attempt
@ -60,33 +77,22 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
long shuffleFinishTime, long sortFinishTime, long finishTime, long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, int port, String rackName, String state, String hostname, int port, String rackName, String state,
Counters counters, int[][] allSplits) { Counters counters, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString()); this.attemptId = id;
datum.attemptId = new Utf8(id.toString()); this.taskType = taskType;
datum.taskType = new Utf8(taskType.name()); this.taskStatus = taskStatus;
datum.taskStatus = new Utf8(taskStatus); this.shuffleFinishTime = shuffleFinishTime;
datum.shuffleFinishTime = shuffleFinishTime; this.sortFinishTime = sortFinishTime;
datum.sortFinishTime = sortFinishTime; this.finishTime = finishTime;
datum.finishTime = finishTime; this.hostname = hostname;
datum.hostname = new Utf8(hostname); this.rackName = rackName;
datum.port = port; this.port = port;
if (rackName != null) { this.state = state;
datum.rackname = new Utf8(rackName); this.counters = counters;
} this.allSplits = allSplits;
datum.state = new Utf8(state); this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
datum.counters = EventWriter.toAvro(counters); this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
datum.clockSplits this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
datum.cpuUsages
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetCPUTime(allSplits));
datum.vMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
} }
/** /**
@ -117,43 +123,87 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
ReduceAttemptFinishedEvent() {} ReduceAttemptFinishedEvent() {}
public Object getDatum() { return datum; } public Object getDatum() {
public void setDatum(Object datum) { if (datum == null) {
this.datum = (ReduceAttemptFinished)datum; datum = new ReduceAttemptFinished();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.attemptId = new Utf8(attemptId.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.shuffleFinishTime = shuffleFinishTime;
datum.sortFinishTime = sortFinishTime;
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetWallclockTime(allSplits));
datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetCPUTime(allSplits));
datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetPhysMemKbytes(allSplits));
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (ReduceAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.shuffleFinishTime = datum.shuffleFinishTime;
this.sortFinishTime = datum.sortFinishTime;
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages);
this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes);
} }
/** Get the Task ID */ /** Get the Task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */ /** Get the attempt id */
public TaskAttemptID getAttemptId() { public TaskAttemptID getAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString()); return TaskAttemptID.forName(attemptId.toString());
} }
/** Get the task type */ /** Get the task type */
public TaskType getTaskType() { public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString()); return TaskType.valueOf(taskType.toString());
} }
/** Get the task status */ /** Get the task status */
public String getTaskStatus() { return datum.taskStatus.toString(); } public String getTaskStatus() { return taskStatus.toString(); }
/** Get the finish time of the sort phase */ /** Get the finish time of the sort phase */
public long getSortFinishTime() { return datum.sortFinishTime; } public long getSortFinishTime() { return sortFinishTime; }
/** Get the finish time of the shuffle phase */ /** Get the finish time of the shuffle phase */
public long getShuffleFinishTime() { return datum.shuffleFinishTime; } public long getShuffleFinishTime() { return shuffleFinishTime; }
/** Get the finish time of the attempt */ /** Get the finish time of the attempt */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt ran */ /** Get the name of the host where the attempt ran */
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */ /** Get the tracker rpc port */
public int getPort() { return datum.port; } public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */ /** Get the rack name of the node where the attempt ran */
public String getRackName() { public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString(); return rackName == null ? null : rackName.toString();
} }
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return state.toString(); }
/** Get the counters for the attempt */ /** Get the counters for the attempt */
Counters getCounters() { return EventReader.fromAvro(datum.counters); } Counters getCounters() { return counters; }
/** Get the event type */ /** Get the event type */
public EventType getEventType() { public EventType getEventType() {
return EventType.REDUCE_ATTEMPT_FINISHED; return EventType.REDUCE_ATTEMPT_FINISHED;
@ -161,16 +211,16 @@ public EventType getEventType() {
public int[] getClockSplits() { public int[] getClockSplits() {
return AvroArrayUtils.fromAvro(datum.clockSplits); return clockSplits;
} }
public int[] getCpuUsages() { public int[] getCpuUsages() {
return AvroArrayUtils.fromAvro(datum.cpuUsages); return cpuUsages;
} }
public int[] getVMemKbytes() { public int[] getVMemKbytes() {
return AvroArrayUtils.fromAvro(datum.vMemKbytes); return vMemKbytes;
} }
public int[] getPhysMemKbytes() { public int[] getPhysMemKbytes() {
return AvroArrayUtils.fromAvro(datum.physMemKbytes); return physMemKbytes;
} }
} }

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
@ -27,8 +26,6 @@
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.avro.util.Utf8;
/** /**
* Event to record successful task completion * Event to record successful task completion
* *
@ -36,7 +33,17 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TaskAttemptFinishedEvent implements HistoryEvent { public class TaskAttemptFinishedEvent implements HistoryEvent {
private TaskAttemptFinished datum = new TaskAttemptFinished();
private TaskAttemptFinished datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String taskStatus;
private long finishTime;
private String rackName;
private String hostname;
private String state;
private Counters counters;
/** /**
* Create an event to record successful finishes for setup and cleanup * Create an event to record successful finishes for setup and cleanup
@ -53,52 +60,73 @@ public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus, TaskType taskType, String taskStatus,
long finishTime, String rackName, long finishTime, String rackName,
String hostname, String state, Counters counters) { String hostname, String state, Counters counters) {
datum.taskid = new Utf8(id.getTaskID().toString()); this.attemptId = id;
datum.attemptId = new Utf8(id.toString()); this.taskType = taskType;
datum.taskType = new Utf8(taskType.name()); this.taskStatus = taskStatus;
datum.taskStatus = new Utf8(taskStatus); this.finishTime = finishTime;
datum.finishTime = finishTime; this.rackName = rackName;
if (rackName != null) { this.hostname = hostname;
datum.rackname = new Utf8(rackName); this.state = state;
} this.counters = counters;
datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
} }
TaskAttemptFinishedEvent() {} TaskAttemptFinishedEvent() {}
public Object getDatum() { return datum; } public Object getDatum() {
public void setDatum(Object datum) { if (datum == null) {
this.datum = (TaskAttemptFinished)datum; datum = new TaskAttemptFinished();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.attemptId = new Utf8(attemptId.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.finishTime = finishTime;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (TaskAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.finishTime = datum.finishTime;
this.rackName = datum.rackname.toString();
this.hostname = datum.hostname.toString();
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
} }
/** Get the task ID */ /** Get the task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the task attempt id */ /** Get the task attempt id */
public TaskAttemptID getAttemptId() { public TaskAttemptID getAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString()); return TaskAttemptID.forName(attemptId.toString());
} }
/** Get the task type */ /** Get the task type */
public TaskType getTaskType() { public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString()); return TaskType.valueOf(taskType.toString());
} }
/** Get the task status */ /** Get the task status */
public String getTaskStatus() { return datum.taskStatus.toString(); } public String getTaskStatus() { return taskStatus.toString(); }
/** Get the attempt finish time */ /** Get the attempt finish time */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return finishTime; }
/** Get the host where the attempt executed */ /** Get the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return hostname.toString(); }
/** Get the rackname where the attempt executed */ /** Get the rackname where the attempt executed */
public String getRackName() { public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString(); return rackName == null ? null : rackName.toString();
} }
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return state.toString(); }
/** Get the counters for the attempt */ /** Get the counters for the attempt */
Counters getCounters() { return EventReader.fromAvro(datum.counters); } Counters getCounters() { return counters; }
/** Get the event type */ /** Get the event type */
public EventType getEventType() { public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the // Note that the task type can be setup/map/reduce/cleanup but the

View File

@ -18,16 +18,13 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.avro.util.Utf8;
/** /**
* Event to record the successful completion of a task * Event to record the successful completion of a task
* *
@ -35,7 +32,14 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TaskFinishedEvent implements HistoryEvent { public class TaskFinishedEvent implements HistoryEvent {
private TaskFinished datum = new TaskFinished();
private TaskFinished datum = null;
private TaskID taskid;
private long finishTime;
private TaskType taskType;
private String status;
private Counters counters;
/** /**
* Create an event to record the successful completion of a task * Create an event to record the successful completion of a task
@ -48,32 +52,48 @@ public class TaskFinishedEvent implements HistoryEvent {
public TaskFinishedEvent(TaskID id, long finishTime, public TaskFinishedEvent(TaskID id, long finishTime,
TaskType taskType, TaskType taskType,
String status, Counters counters) { String status, Counters counters) {
datum.taskid = new Utf8(id.toString()); this.taskid = id;
datum.finishTime = finishTime; this.finishTime = finishTime;
datum.counters = EventWriter.toAvro(counters); this.taskType = taskType;
datum.taskType = new Utf8(taskType.name()); this.status = status;
datum.status = new Utf8(status); this.counters = counters;
} }
TaskFinishedEvent() {} TaskFinishedEvent() {}
public Object getDatum() { return datum; } public Object getDatum() {
public void setDatum(Object datum) { if (datum == null) {
this.datum = (TaskFinished)datum; datum = new TaskFinished();
datum.taskid = new Utf8(taskid.toString());
datum.finishTime = finishTime;
datum.counters = EventWriter.toAvro(counters);
datum.taskType = new Utf8(taskType.name());
datum.status = new Utf8(status);
}
return datum;
}
public void setDatum(Object oDatum) {
this.datum = (TaskFinished)oDatum;
this.taskid = TaskID.forName(datum.taskid.toString());
this.finishTime = datum.finishTime;
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.status = datum.status.toString();
this.counters = EventReader.fromAvro(datum.counters);
} }
/** Get task id */ /** Get task id */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } public TaskID getTaskId() { return TaskID.forName(taskid.toString()); }
/** Get the task finish time */ /** Get the task finish time */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return finishTime; }
/** Get task counters */ /** Get task counters */
public Counters getCounters() { return EventReader.fromAvro(datum.counters); } public Counters getCounters() { return counters; }
/** Get task type */ /** Get task type */
public TaskType getTaskType() { public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString()); return TaskType.valueOf(taskType.toString());
} }
/** Get task status */ /** Get task status */
public String getTaskStatus() { return datum.status.toString(); } public String getTaskStatus() { return status.toString(); }
/** Get event type */ /** Get event type */
public EventType getEventType() { public EventType getEventType() {
return EventType.TASK_FINISHED; return EventType.TASK_FINISHED;

View File

@ -32,13 +32,13 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -89,7 +89,7 @@ public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
loadFullHistoryData(loadTasks, historyFile); loadFullHistoryData(loadTasks, historyFile);
user = userName; user = userName;
counters = TypeConverter.toYarn(jobInfo.getTotalCounters()); counters = jobInfo.getTotalCounters();
diagnostics.add(jobInfo.getErrorInfo()); diagnostics.add(jobInfo.getErrorInfo());
report = report =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance( RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
@ -121,7 +121,7 @@ public int getCompletedReduces() {
} }
@Override @Override
public Counters getCounters() { public Counters getAllCounters() {
return counters; return counters;
} }

View File

@ -24,10 +24,10 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
@ -60,7 +60,7 @@ public class CompletedTask implements Task {
this.finishTime = taskInfo.getFinishTime(); this.finishTime = taskInfo.getFinishTime();
this.type = TypeConverter.toYarn(taskInfo.getTaskType()); this.type = TypeConverter.toYarn(taskInfo.getTaskType());
if (taskInfo.getCounters() != null) if (taskInfo.getCounters() != null)
this.counters = TypeConverter.toYarn(taskInfo.getCounters()); this.counters = taskInfo.getCounters();
if (taskInfo.getTaskStatus() != null) { if (taskInfo.getTaskStatus() != null) {
this.state = TaskState.valueOf(taskInfo.getTaskStatus()); this.state = TaskState.valueOf(taskInfo.getTaskStatus());
} else { } else {
@ -86,7 +86,7 @@ public class CompletedTask implements Task {
report.setFinishTime(finishTime); report.setFinishTime(finishTime);
report.setTaskState(state); report.setTaskState(state);
report.setProgress(getProgress()); report.setProgress(getProgress());
report.setCounters(getCounters()); report.setCounters(TypeConverter.toYarn(getCounters()));
report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet())); report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
} }

View File

@ -21,9 +21,9 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -46,8 +46,9 @@ public class CompletedTaskAttempt implements TaskAttempt {
CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) { CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
this.attemptInfo = attemptInfo; this.attemptInfo = attemptInfo;
this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId()); this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
if (attemptInfo.getCounters() != null) if (attemptInfo.getCounters() != null) {
this.counters = TypeConverter.toYarn(attemptInfo.getCounters()); this.counters = attemptInfo.getCounters();
}
if (attemptInfo.getTaskStatus() != null) { if (attemptInfo.getTaskStatus() != null) {
this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus()); this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus());
} else { } else {
@ -61,7 +62,6 @@ public class CompletedTaskAttempt implements TaskAttempt {
} }
report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class); report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class);
report.setCounters(counters);
report.setTaskAttemptId(attemptId); report.setTaskAttemptId(attemptId);
report.setTaskAttemptState(state); report.setTaskAttemptState(state);
@ -78,7 +78,7 @@ public class CompletedTaskAttempt implements TaskAttempt {
} }
// report.setPhase(attemptInfo.get); //TODO // report.setPhase(attemptInfo.get); //TODO
report.setStateString(attemptInfo.getState()); report.setStateString(attemptInfo.getState());
report.setCounters(getCounters()); report.setCounters(TypeConverter.toYarn(getCounters()));
report.setContainerId(attemptInfo.getContainerId()); report.setContainerId(attemptInfo.getContainerId());
if (attemptInfo.getHostname() == null) { if (attemptInfo.getHostname() == null) {
report.setNodeManagerHost("UNKNOWN"); report.setNodeManagerHost("UNKNOWN");

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@ -190,7 +191,7 @@ public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRe
JobId jobId = request.getJobId(); JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId); Job job = verifyAndGetJob(jobId);
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class); GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(job.getCounters()); response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
return response; return response;
} }

View File

@ -22,9 +22,9 @@
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -95,7 +95,7 @@ public float getProgress() {
} }
@Override @Override
public Counters getCounters() { public Counters getAllCounters() {
return null; return null;
} }

View File

@ -699,7 +699,7 @@ public void verifyHsJobTaskAttemptCounters(JSONObject info, TaskAttempt att)
assertTrue("name not set", (name != null && !name.isEmpty())); assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter"); JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) { for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i); JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name"); String counterName = counter.getString("name");
assertTrue("name not set", assertTrue("name not set",
(counterName != null && !counterName.isEmpty())); (counterName != null && !counterName.isEmpty()));

View File

@ -788,7 +788,7 @@ public void verifyHsJobTaskCounters(JSONObject info, Task task)
assertTrue("name not set", (name != null && !name.isEmpty())); assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter"); JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) { for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i); JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name"); String counterName = counter.getString("name");
assertTrue("name not set", assertTrue("name not set",
(counterName != null && !counterName.isEmpty())); (counterName != null && !counterName.isEmpty()));