From 65e6abb4775eb3e4abc38ca60d351ee286b5a944 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 9 Jan 2012 21:07:25 +0000 Subject: [PATCH] MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM thereby reducing the AM heap size and preventing full GCs. (vinodkv) svn merge -c 1229347 trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229350 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/TaskAttemptListenerImpl.java | 8 +- .../jobhistory/JobHistoryEventHandler.java | 36 +++-- .../v2/app/client/MRClientService.java | 24 ++- .../hadoop/mapreduce/v2/app/job/Job.java | 12 +- .../hadoop/mapreduce/v2/app/job/Task.java | 2 +- .../mapreduce/v2/app/job/TaskAttempt.java | 2 +- .../event/TaskAttemptStatusUpdateEvent.java | 3 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 105 ++++--------- .../v2/app/job/impl/TaskAttemptImpl.java | 30 ++-- .../mapreduce/v2/app/job/impl/TaskImpl.java | 12 +- .../v2/app/recover/RecoveryService.java | 4 +- .../v2/app/webapp/CountersBlock.java | 48 +++--- .../v2/app/webapp/SingleCounterBlock.java | 26 ++-- .../v2/app/webapp/dao/CounterGroupInfo.java | 12 +- .../v2/app/webapp/dao/CounterInfo.java | 8 +- .../v2/app/webapp/dao/JobCounterInfo.java | 32 ++-- .../webapp/dao/JobTaskAttemptCounterInfo.java | 6 +- .../v2/app/webapp/dao/JobTaskCounterInfo.java | 6 +- .../app/webapp/dao/TaskCounterGroupInfo.java | 8 +- .../hadoop/mapreduce/v2/app/MockJobs.java | 29 ++-- .../v2/app/TestRuntimeEstimators.java | 15 +- .../app/webapp/TestAMWebServicesAttempts.java | 2 +- .../v2/app/webapp/TestAMWebServicesTasks.java | 2 +- .../hadoop/mapreduce/TypeConverter.java | 4 +- .../jobhistory/JobFinishedEvent.java | 89 +++++++---- .../jobhistory/MapAttemptFinishedEvent.java | 142 +++++++++++------ .../ReduceAttemptFinishedEvent.java | 146 ++++++++++++------ .../jobhistory/TaskAttemptFinishedEvent.java | 84 ++++++---- .../jobhistory/TaskFinishedEvent.java | 56 ++++--- .../hadoop/mapreduce/v2/hs/CompletedJob.java | 6 +- .../hadoop/mapreduce/v2/hs/CompletedTask.java | 6 +- .../mapreduce/v2/hs/CompletedTaskAttempt.java | 10 +- .../mapreduce/v2/hs/HistoryClientService.java | 3 +- .../hadoop/mapreduce/v2/hs/PartialJob.java | 4 +- .../hs/webapp/TestHsWebServicesAttempts.java | 2 +- .../v2/hs/webapp/TestHsWebServicesTasks.java | 2 +- 37 files changed, 584 insertions(+), 405 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 17bef1c143d..4042f0ec500 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -108,6 +108,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3569. TaskAttemptListener holds a global lock for all task-updates. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM + thereby reducing the AM heap size and preventing full GCs. (vinodkv) + BUG FIXES MAPREDUCE-3462. Fix Gridmix JUnit testcase failures. (Ravi Prakash and Ravi Gummadi via amarrk) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 0e6e3eed04c..cefc7b7b9aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -325,9 +325,11 @@ public class TaskAttemptListenerImpl extends CompositeService taskAttemptStatus.outputSize = taskStatus.getOutputSize(); // Task sends the updated phase to the TT. taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase()); - // Counters are updated by the task. - taskAttemptStatus.counters = - TypeConverter.toYarn(taskStatus.getCounters()); + // Counters are updated by the task. Convert counters into new format as + // that is the primary storage format inside the AM to avoid multiple + // conversions and unnecessary heap usage. + taskAttemptStatus.counters = new org.apache.hadoop.mapreduce.Counters( + taskStatus.getCounters()); // Map Finish time set by the task (map only) if (taskStatus.getIsMap() && taskStatus.getMapFinishTime() != 0) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index ae886cf6d1f..169917a30c1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -36,10 +36,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -63,6 +64,8 @@ public class JobHistoryEventHandler extends AbstractService private final AppContext context; private final int startCount; + private int eventCounter; + //TODO Does the FS object need to be different ? private FileSystem stagingDirFS; // log Dir FileSystem private FileSystem doneDirFS; // done Dir FileSystem @@ -210,6 +213,16 @@ public class JobHistoryEventHandler extends AbstractService public void run() { JobHistoryEvent event = null; while (!stopped && !Thread.currentThread().isInterrupted()) { + + // Log the size of the history-event-queue every so often. + if (eventCounter % 1000 == 0) { + eventCounter = 0; + LOG.info("Size of the JobHistory event queue is " + + eventQueue.size()); + } else { + eventCounter++; + } + try { event = eventQueue.take(); } catch (InterruptedException e) { @@ -238,7 +251,8 @@ public class JobHistoryEventHandler extends AbstractService @Override public void stop() { - LOG.info("Stopping JobHistoryEventHandler"); + LOG.info("Stopping JobHistoryEventHandler. " + + "Size of the outstanding queue size is " + eventQueue.size()); stopped = true; //do not interrupt while event handling is in progress synchronized(lock) { @@ -483,7 +497,7 @@ public class JobHistoryEventHandler extends AbstractService .toString()); // TODO JOB_FINISHED does not have state. Effectively job history does not // have state about the finished job. - setSummarySlotSeconds(summary, jobId); + setSummarySlotSeconds(summary, jfe.getTotalCounters()); break; case JOB_FAILED: case JOB_KILLED: @@ -492,21 +506,21 @@ public class JobHistoryEventHandler extends AbstractService summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps()); summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces()); summary.setJobFinishTime(juce.getFinishTime()); - setSummarySlotSeconds(summary, jobId); + setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters()); break; } } - private void setSummarySlotSeconds(JobSummary summary, JobId jobId) { - Counter slotMillisMapCounter = - context.getJob(jobId).getCounters() - .getCounter(JobCounter.SLOTS_MILLIS_MAPS); + private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { + + Counter slotMillisMapCounter = allCounters + .findCounter(JobCounter.SLOTS_MILLIS_MAPS); if (slotMillisMapCounter != null) { summary.setMapSlotSeconds(slotMillisMapCounter.getValue()); } - Counter slotMillisReduceCounter = - context.getJob(jobId).getCounters() - .getCounter(JobCounter.SLOTS_MILLIS_REDUCES); + + Counter slotMillisReduceCounter = allCounters + .findCounter(JobCounter.SLOTS_MILLIS_REDUCES); if (slotMillisReduceCounter != null) { summary.setMapSlotSeconds(slotMillisReduceCounter.getValue()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index 2d787a6776b..68d0179625c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; @@ -223,7 +224,7 @@ public class MRClientService extends AbstractService Job job = verifyAndGetJob(jobId, false); GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class); - response.setCounters(job.getCounters()); + response.setCounters(TypeConverter.toYarn(job.getAllCounters())); return response; } @@ -237,8 +238,7 @@ public class MRClientService extends AbstractService response.setJobReport(job.getReport()); return response; } - - + @Override public GetTaskAttemptReportResponse getTaskAttemptReport( GetTaskAttemptReportRequest request) throws YarnRemoteException { @@ -356,6 +356,8 @@ public class MRClientService extends AbstractService return response; } + private final Object getTaskReportsLock = new Object(); + @Override public GetTaskReportsResponse getTaskReports( GetTaskReportsRequest request) throws YarnRemoteException { @@ -366,12 +368,18 @@ public class MRClientService extends AbstractService recordFactory.newRecordInstance(GetTaskReportsResponse.class); Job job = verifyAndGetJob(jobId, false); - LOG.info("Getting task report for " + taskType + " " + jobId); Collection tasks = job.getTasks(taskType).values(); - LOG.info("Getting task report size " + tasks.size()); - for (Task task : tasks) { - response.addTaskReport(task.getReport()); - } + LOG.info("Getting task report for " + taskType + " " + jobId + + ". Report-size will be " + tasks.size()); + + // Take lock to allow only one call, otherwise heap will blow up because + // of counters in the report when there are multiple callers. + synchronized (getTaskReportsLock) { + for (Task task : tasks) { + response.addTaskReport(task.getReport()); + } + } + return response; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index 9094c77cc33..d30bb737ed9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -22,9 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -44,7 +44,15 @@ public interface Job { String getName(); JobState getState(); JobReport getReport(); - Counters getCounters(); + + /** + * Get all the counters of this job. This includes job-counters aggregated + * together with the counters of each task. This creates a clone of the + * Counters, so use this judiciously. + * @return job-counters and aggregate task-counters + */ + Counters getAllCounters(); + Map getTasks(); Map getTasks(TaskType taskType); Task getTask(TaskId taskID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java index 84eb83409bb..fa8ceb2742a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java @@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job; import java.util.Map; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java index cc7449524e0..e627128975c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java @@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job; import java.util.List; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java index 6a9b78af43d..ac07e948139 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java @@ -20,12 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app.job.event; import java.util.List; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; - public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { private TaskAttemptStatus reportedTaskAttemptStatus; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index f498590fdc7..808e92d7624 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -61,9 +62,6 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -99,7 +97,6 @@ import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -109,10 +106,13 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. */ -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" }) public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, EventHandler { + private static final TaskAttemptCompletionEvent[] + EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0]; + private static final Log LOG = LogFactory.getLog(JobImpl.class); //The maximum fraction of fetch failures allowed for a map @@ -152,7 +152,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private boolean lazyTasksCopyNeeded = false; volatile Map tasks = new LinkedHashMap(); - private Counters jobCounters = newCounters(); + private Counters jobCounters = new Counters(); // FIXME: // // Can then replace task-level uber counters (MR-2424) with job-level ones @@ -475,88 +475,29 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } @Override - public Counters getCounters() { - Counters counters = newCounters(); + public Counters getAllCounters() { + Counters counters = new Counters(); readLock.lock(); try { - incrAllCounters(counters, jobCounters); + counters.incrAllCounters(jobCounters); return incrTaskCounters(counters, tasks.values()); } finally { readLock.unlock(); } } - private Counters getTypeCounters(Set taskIds) { - Counters counters = newCounters(); - for (TaskId taskId : taskIds) { - Task task = tasks.get(taskId); - incrAllCounters(counters, task.getCounters()); - } - return counters; - } - - private Counters getMapCounters() { - readLock.lock(); - try { - return getTypeCounters(mapTasks); - } finally { - readLock.unlock(); - } - } - - private Counters getReduceCounters() { - readLock.lock(); - try { - return getTypeCounters(reduceTasks); - } finally { - readLock.unlock(); - } - } - - public static Counters newCounters() { - Counters counters = RecordFactoryProvider.getRecordFactory(null) - .newRecordInstance(Counters.class); - return counters; - } - - public static Counters incrTaskCounters(Counters counters, - Collection tasks) { + public static Counters incrTaskCounters( + Counters counters, Collection tasks) { for (Task task : tasks) { - incrAllCounters(counters, task.getCounters()); + counters.incrAllCounters(task.getCounters()); } return counters; } - public static void incrAllCounters(Counters counters, Counters other) { - if (other != null) { - for (CounterGroup otherGroup: other.getAllCounterGroups().values()) { - CounterGroup group = counters.getCounterGroup(otherGroup.getName()); - if (group == null) { - group = RecordFactoryProvider.getRecordFactory(null) - .newRecordInstance(CounterGroup.class); - group.setName(otherGroup.getName()); - counters.setCounterGroup(group.getName(), group); - } - group.setDisplayName(otherGroup.getDisplayName()); - for (Counter otherCounter : otherGroup.getAllCounters().values()) { - Counter counter = group.getCounter(otherCounter.getName()); - if (counter == null) { - counter = RecordFactoryProvider.getRecordFactory(null) - .newRecordInstance(Counter.class); - counter.setName(otherCounter.getName()); - group.setCounter(counter.getName(), counter); - } - counter.setDisplayName(otherCounter.getDisplayName()); - counter.setValue(counter.getValue() + otherCounter.getValue()); - } - } - } - } - @Override public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents( int fromEventId, int maxEvents) { - TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0]; + TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS; readLock.lock(); try { if (taskAttemptCompletionEvents.size() > fromEventId) { @@ -1204,13 +1145,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // area. May need to create a new event type for this if JobFinished should // not be generated for KilledJobs, etc. private static JobFinishedEvent createJobFinishedEvent(JobImpl job) { + + Counters mapCounters = new Counters(); + Counters reduceCounters = new Counters(); + for (Task t : job.tasks.values()) { + Counters counters = t.getCounters(); + switch (t.getType()) { + case MAP: mapCounters.incrAllCounters(counters); break; + case REDUCE: reduceCounters.incrAllCounters(counters); break; + } + } + JobFinishedEvent jfe = new JobFinishedEvent( job.oldJobId, job.finishTime, job.succeededMapTaskCount, job.succeededReduceTaskCount, job.failedMapTaskCount, job.failedReduceTaskCount, - TypeConverter.fromYarn(job.getMapCounters()), - TypeConverter.fromYarn(job.getReduceCounters()), - TypeConverter.fromYarn(job.getCounters())); + mapCounters, + reduceCounters, + job.getAllCounters()); return jfe; } @@ -1450,7 +1402,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event; for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce .getCounterUpdates()) { - job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue()); + job.jobCounters.findCounter(ci.getCounterKey()).increment( + ci.getIncrementValue()); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 4655895dbf7..d9ed1e53f8e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -47,6 +47,8 @@ import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskAttemptContextImpl; import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapred.WrappedProgressSplitsBlock; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -60,8 +62,6 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; @@ -132,6 +132,7 @@ public abstract class TaskAttemptImpl implements org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, EventHandler { + static final Counters EMPTY_COUNTERS = new Counters(); private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class); private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable? private static final int MAP_MEMORY_MB_DEFAULT = 1024; @@ -846,7 +847,7 @@ public abstract class TaskAttemptImpl implements result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics())); result.setPhase(reportedStatus.phase); result.setStateString(reportedStatus.stateString); - result.setCounters(getCounters()); + result.setCounters(TypeConverter.toYarn(getCounters())); result.setContainerId(this.getAssignedContainerID()); result.setNodeManagerHost(trackerName); result.setNodeManagerHttpPort(httpPort); @@ -877,7 +878,7 @@ public abstract class TaskAttemptImpl implements try { Counters counters = reportedStatus.counters; if (counters == null) { - counters = recordFactory.newRecordInstance(Counters.class); + counters = EMPTY_COUNTERS; // counters.groups = new HashMap(); } return counters; @@ -1031,22 +1032,21 @@ public abstract class TaskAttemptImpl implements (int) (now - start)); } - Counter cpuCounter = counters.getCounter( - TaskCounter.CPU_MILLISECONDS); + Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS); if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) { splitsBlock.getProgressCPUTime().extend(newProgress, - (int) cpuCounter.getValue()); + (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below } - Counter virtualBytes = counters.getCounter( - TaskCounter.VIRTUAL_MEMORY_BYTES); + Counter virtualBytes = counters + .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES); if (virtualBytes != null) { splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress, (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION))); } - Counter physicalBytes = counters.getCounter( - TaskCounter.PHYSICAL_MEMORY_BYTES); + Counter physicalBytes = counters + .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES); if (physicalBytes != null) { splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress, (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION))); @@ -1343,7 +1343,7 @@ public abstract class TaskAttemptImpl implements this.containerNodeId == null ? -1 : this.containerNodeId.getPort(), this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.reportedStatus.stateString, - TypeConverter.fromYarn(getCounters()), + getCounters(), getProgressSplitBlock().burst()); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); @@ -1360,7 +1360,7 @@ public abstract class TaskAttemptImpl implements this.containerNodeId == null ? -1 : this.containerNodeId.getPort(), this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.reportedStatus.stateString, - TypeConverter.fromYarn(getCounters()), + getCounters(), getProgressSplitBlock().burst()); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); @@ -1498,8 +1498,8 @@ public abstract class TaskAttemptImpl implements result.phase = Phase.STARTING; result.stateString = "NEW"; result.taskState = TaskAttemptState.NEW; - Counters counters = recordFactory.newRecordInstance(Counters.class); -// counters.groups = new HashMap(); + Counters counters = EMPTY_COUNTERS; + // counters.groups = new HashMap(); result.counters = counters; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index b403751154b..e376c9e887d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -40,7 +41,6 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; @@ -329,7 +329,6 @@ public abstract class TaskImpl implements Task, EventHandler { report.setFinishTime(getFinishTime()); report.setTaskState(getState()); report.setProgress(getProgress()); - report.setCounters(getCounters()); for (TaskAttempt attempt : attempts.values()) { if (TaskAttemptState.RUNNING.equals(attempt.getState())) { @@ -346,6 +345,11 @@ public abstract class TaskImpl implements Task, EventHandler { } } + + // Add a copy of counters as the last step so that their lifetime on heap + // is as small as possible. + report.setCounters(TypeConverter.toYarn(getCounters())); + return report; } finally { readLock.unlock(); @@ -361,7 +365,7 @@ public abstract class TaskImpl implements Task, EventHandler { if (bestAttempt != null) { counters = bestAttempt.getCounters(); } else { - counters = recordFactory.newRecordInstance(Counters.class); + counters = TaskAttemptImpl.EMPTY_COUNTERS; // counters.groups = new HashMap(); } return counters; @@ -595,7 +599,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.getFinishTime(task.successfulAttempt), TypeConverter.fromYarn(task.taskId.getTaskType()), taskState.toString(), - TypeConverter.fromYarn(task.getCounters())); + task.getCounters()); return tfe; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 30cbdae67b0..775cc11571e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; //TODO: //task cleanup for all non completed tasks - public class RecoveryService extends CompositeService implements Recovery { private static final Log LOG = LogFactory.getLog(RecoveryService.class); @@ -411,8 +410,7 @@ public class RecoveryService extends CompositeService implements Recovery { if (cntrs == null) { taskAttemptStatus.counters = null; } else { - taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo - .getCounters()); + taskAttemptStatus.counters = cntrs; } actualHandler.handle(new TaskAttemptStatusUpdateEvent( taskAttemptStatus.id, taskAttemptStatus)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java index 6accd8add73..ec02ef5e896 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java @@ -18,25 +18,32 @@ package org.apache.hadoop.mapreduce.v2.app.webapp; -import com.google.inject.Inject; +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID; +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_TABLE; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP; + import java.util.Map; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; -import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*; -import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*; +import com.google.inject.Inject; public class CountersBlock extends HtmlBlock { Job job; @@ -62,8 +69,7 @@ public class CountersBlock extends HtmlBlock { return; } - if(total == null || total.getAllCounterGroups() == null || - total.getAllCounterGroups().size() <= 0) { + if(total == null || total.getGroupNames() == null) { String type = $(TASK_ID); if(type == null || type.isEmpty()) { type = $(JOB_ID, "the job"); @@ -93,9 +99,9 @@ public class CountersBlock extends HtmlBlock { th(".group.ui-state-default", "Counter Group"). th(".ui-state-default", "Counters")._()._(). tbody(); - for (CounterGroup g : total.getAllCounterGroups().values()) { - CounterGroup mg = map == null ? null : map.getCounterGroup(g.getName()); - CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g.getName()); + for (CounterGroup g : total) { + CounterGroup mg = map == null ? null : map.getGroup(g.getName()); + CounterGroup rg = reduce == null ? null : reduce.getGroup(g.getName()); ++numGroups; // This is mostly for demonstration :) Typically we'd introduced // a CounterGroup block to reduce the verbosity. OTOH, this @@ -116,7 +122,7 @@ public class CountersBlock extends HtmlBlock { TBODY>>>>>> group = groupHeadRow. th(map == null ? "Value" : "Total")._()._(). tbody(); - for (Counter counter : g.getAllCounters().values()) { + for (Counter counter : g) { // Ditto TR>>>>>>> groupRow = group. tr(); @@ -130,8 +136,8 @@ public class CountersBlock extends HtmlBlock { _(); } if (map != null) { - Counter mc = mg == null ? null : mg.getCounter(counter.getName()); - Counter rc = rg == null ? null : rg.getCounter(counter.getName()); + Counter mc = mg == null ? null : mg.findCounter(counter.getName()); + Counter rc = rg == null ? null : rg.findCounter(counter.getName()); groupRow. td(mc == null ? "0" : String.valueOf(mc.getValue())). td(rc == null ? "0" : String.valueOf(rc.getValue())); @@ -173,14 +179,14 @@ public class CountersBlock extends HtmlBlock { } // Get all types of counters Map tasks = job.getTasks(); - total = job.getCounters(); - map = JobImpl.newCounters(); - reduce = JobImpl.newCounters(); + total = job.getAllCounters(); + map = new Counters(); + reduce = new Counters(); for (Task t : tasks.values()) { Counters counters = t.getCounters(); switch (t.getType()) { - case MAP: JobImpl.incrAllCounters(map, counters); break; - case REDUCE: JobImpl.incrAllCounters(reduce, counters); break; + case MAP: map.incrAllCounters(counters); break; + case REDUCE: reduce.incrAllCounters(counters); break; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/SingleCounterBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/SingleCounterBlock.java index bb728225428..cf083ccb279 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/SingleCounterBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/SingleCounterBlock.java @@ -18,13 +18,18 @@ package org.apache.hadoop.mapreduce.v2.app.webapp; -import com.google.inject.Inject; +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.COUNTER_GROUP; +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.COUNTER_NAME; +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID; +import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP; + import java.util.Map; import java.util.TreeMap; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; @@ -40,8 +45,7 @@ import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; -import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*; -import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*; +import com.google.inject.Inject; public class SingleCounterBlock extends HtmlBlock { protected TreeMap values = new TreeMap(); @@ -122,10 +126,10 @@ public class SingleCounterBlock extends HtmlBlock { task.getAttempts().entrySet()) { long value = 0; Counters counters = entry.getValue().getCounters(); - CounterGroup group = (counters != null) - ? counters.getCounterGroup($(COUNTER_GROUP)) : null; + CounterGroup group = (counters != null) ? counters + .getGroup($(COUNTER_GROUP)) : null; if(group != null) { - Counter c = group.getCounter($(COUNTER_NAME)); + Counter c = group.findCounter($(COUNTER_NAME)); if(c != null) { value = c.getValue(); } @@ -140,9 +144,9 @@ public class SingleCounterBlock extends HtmlBlock { for(Map.Entry entry : tasks.entrySet()) { long value = 0; CounterGroup group = entry.getValue().getCounters() - .getCounterGroup($(COUNTER_GROUP)); + .getGroup($(COUNTER_GROUP)); if(group != null) { - Counter c = group.getCounter($(COUNTER_NAME)); + Counter c = group.findCounter($(COUNTER_NAME)); if(c != null) { value = c.getValue(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterGroupInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterGroupInfo.java index 99009ca1f6b..518d19e788d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterGroupInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterGroupInfo.java @@ -24,8 +24,8 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; @XmlRootElement(name = "counterGroup") @XmlAccessorType(XmlAccessType.FIELD) @@ -38,14 +38,14 @@ public class CounterGroupInfo { public CounterGroupInfo() { } - public CounterGroupInfo(String name, CounterGroup g, CounterGroup mg, + public CounterGroupInfo(String name, CounterGroup group, CounterGroup mg, CounterGroup rg) { this.counterGroupName = name; this.counter = new ArrayList(); - for (Counter c : g.getAllCounters().values()) { - Counter mc = mg == null ? null : mg.getCounter(c.getName()); - Counter rc = rg == null ? null : rg.getCounter(c.getName()); + for (Counter c : group) { + Counter mc = mg == null ? null : mg.findCounter(c.getName()); + Counter rc = rg == null ? null : rg.findCounter(c.getName()); CounterInfo cinfo = new CounterInfo(c, mc, rc); this.counter.add(cinfo); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterInfo.java index e632f344bb9..bae21fdeb38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/CounterInfo.java @@ -21,7 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; +import org.apache.hadoop.mapreduce.Counter; @XmlRootElement @XmlAccessorType(XmlAccessType.FIELD) @@ -35,9 +35,9 @@ public class CounterInfo { public CounterInfo() { } - public CounterInfo(Counter counter, Counter mc, Counter rc) { - this.name = counter.getName(); - this.totalCounterValue = counter.getValue(); + public CounterInfo(Counter c, Counter mc, Counter rc) { + this.name = c.getName(); + this.totalCounterValue = c.getValue(); this.mapCounterValue = mc == null ? 0 : mc.getValue(); this.reduceCounterValue = rc == null ? 0 : rc.getValue(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobCounterInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobCounterInfo.java index 73dc2065543..6dbc9189b79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobCounterInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobCounterInfo.java @@ -25,13 +25,12 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.util.MRApps; @XmlRootElement(name = "jobCounters") @@ -56,18 +55,15 @@ public class JobCounterInfo { counterGroup = new ArrayList(); this.id = MRApps.toString(job.getID()); - int numGroups = 0; - if (total != null) { - for (CounterGroup g : total.getAllCounterGroups().values()) { + for (CounterGroup g : total) { if (g != null) { - CounterGroup mg = map == null ? null : map.getCounterGroup(g - .getName()); - CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g - .getName()); - ++numGroups; + CounterGroup mg = map == null ? null : map.getGroup(g.getName()); + CounterGroup rg = reduce == null ? null : reduce + .getGroup(g.getName()); - CounterGroupInfo cginfo = new CounterGroupInfo(g.getName(), g, mg, rg); + CounterGroupInfo cginfo = new CounterGroupInfo(g.getName(), g, + mg, rg); counterGroup.add(cginfo); } } @@ -75,23 +71,23 @@ public class JobCounterInfo { } private void getCounters(AppContext ctx, Job job) { - total = JobImpl.newCounters(); + total = new Counters(); if (job == null) { return; } - map = JobImpl.newCounters(); - reduce = JobImpl.newCounters(); + map = new Counters(); + reduce = new Counters(); // Get all types of counters Map tasks = job.getTasks(); for (Task t : tasks.values()) { Counters counters = t.getCounters(); - JobImpl.incrAllCounters(total, counters); + total.incrAllCounters(counters); switch (t.getType()) { case MAP: - JobImpl.incrAllCounters(map, counters); + map.incrAllCounters(counters); break; case REDUCE: - JobImpl.incrAllCounters(reduce, counters); + reduce.incrAllCounters(counters); break; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskAttemptCounterInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskAttemptCounterInfo.java index a14e5feb878..2026c76ddbc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskAttemptCounterInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskAttemptCounterInfo.java @@ -25,8 +25,8 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.util.MRApps; @@ -49,7 +49,7 @@ public class JobTaskAttemptCounterInfo { total = taskattempt.getCounters(); taskAttemptCounterGroup = new ArrayList(); if (total != null) { - for (CounterGroup g : total.getAllCounterGroups().values()) { + for (CounterGroup g : total) { if (g != null) { TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g); if (cginfo != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskCounterInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskCounterInfo.java index bcdde8c03cb..dad213f8479 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskCounterInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/JobTaskCounterInfo.java @@ -25,8 +25,8 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.util.MRApps; @@ -48,7 +48,7 @@ public class JobTaskCounterInfo { this.id = MRApps.toString(task.getID()); taskCounterGroup = new ArrayList(); if (total != null) { - for (CounterGroup g : total.getAllCounterGroups().values()) { + for (CounterGroup g : total) { if (g != null) { TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g); taskCounterGroup.add(cginfo); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskCounterGroupInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskCounterGroupInfo.java index fa9dfcb7a4a..fed2db811ed 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskCounterGroupInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskCounterGroupInfo.java @@ -24,8 +24,8 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; @XmlRootElement @XmlAccessorType(XmlAccessType.FIELD) @@ -37,11 +37,11 @@ public class TaskCounterGroupInfo { public TaskCounterGroupInfo() { } - public TaskCounterGroupInfo(String name, CounterGroup g) { + public TaskCounterGroupInfo(String name, CounterGroup group) { this.counterGroupName = name; this.counter = new ArrayList(); - for (Counter c : g.getAllCounters().values()) { + for (Counter c : group) { TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue()); this.counter.add(cinfo); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 183f589aa6f..c8e20f64577 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobCounter; @@ -37,7 +38,6 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -144,7 +144,7 @@ public class MockJobs extends MockApps { report.setFinishTime(System.currentTimeMillis() + (int) (Math.random() * DT) + 1); report.setProgress((float) Math.random()); - report.setCounters(newCounters()); + report.setCounters(TypeConverter.toYarn(newCounters())); report.setTaskState(TASK_STATES.next()); return report; } @@ -159,13 +159,12 @@ public class MockJobs extends MockApps { report.setPhase(PHASES.next()); report.setTaskAttemptState(TASK_ATTEMPT_STATES.next()); report.setProgress((float) Math.random()); - report.setCounters(newCounters()); + report.setCounters(TypeConverter.toYarn(newCounters())); return report; } - @SuppressWarnings("deprecation") public static Counters newCounters() { - org.apache.hadoop.mapred.Counters hc = new org.apache.hadoop.mapred.Counters(); + Counters hc = new Counters(); for (JobCounter c : JobCounter.values()) { hc.findCounter(c).setValue((long) (Math.random() * 1000)); } @@ -183,7 +182,7 @@ public class MockJobs extends MockApps { hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next()) .setValue((long) (Math.random() * 100000)); } - return TypeConverter.toYarn(hc); + return hc; } public static Map newTaskAttempts(TaskId tid, @@ -231,7 +230,10 @@ public class MockJobs extends MockApps { @Override public Counters getCounters() { - return report.getCounters(); + if (report != null && report.getCounters() != null) { + return new Counters(TypeConverter.fromYarn(report.getCounters())); + } + return null; } @Override @@ -327,7 +329,8 @@ public class MockJobs extends MockApps { @Override public Counters getCounters() { - return report.getCounters(); + return new Counters( + TypeConverter.fromYarn(report.getCounters())); } @Override @@ -373,8 +376,9 @@ public class MockJobs extends MockApps { }; } - public static Counters getCounters(Collection tasks) { - Counters counters = JobImpl.newCounters(); + public static Counters getCounters( + Collection tasks) { + Counters counters = new Counters(); return JobImpl.incrTaskCounters(counters, tasks); } @@ -419,7 +423,8 @@ public class MockJobs extends MockApps { final JobReport report = newJobReport(id); final Map tasks = newTasks(id, n, m); final TaskCount taskCount = getTaskCount(tasks.values()); - final Counters counters = getCounters(tasks.values()); + final Counters counters = getCounters(tasks + .values()); final Path configFile = confFile; Map tmpJobACLs = new HashMap(); @@ -457,7 +462,7 @@ public class MockJobs extends MockApps { } @Override - public Counters getCounters() { + public Counters getAllCounters() { return counters; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index dd0a7f1c7b6..4df8df27e13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -18,9 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -30,11 +27,14 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -46,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; -import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator; import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; @@ -74,7 +73,7 @@ import org.apache.hadoop.yarn.service.CompositeService; import org.junit.Assert; import org.junit.Test; - +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestRuntimeEstimators { private static int INITIAL_NUMBER_FREE_SLOTS = 600; @@ -399,7 +398,7 @@ public class TestRuntimeEstimators { } @Override - public Counters getCounters() { + public Counters getAllCounters() { throw new UnsupportedOperationException("Not supported yet."); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java index 215d7718e15..ee824ee10a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java @@ -686,7 +686,7 @@ public class TestAMWebServicesAttempts extends JerseyTest { assertTrue("name not set", (name != null && !name.isEmpty())); JSONArray counters = counterGroup.getJSONArray("counter"); for (int j = 0; j < counters.length(); j++) { - JSONObject counter = counters.getJSONObject(i); + JSONObject counter = counters.getJSONObject(j); String counterName = counter.getString("name"); assertTrue("name not set", (counterName != null && !counterName.isEmpty())); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java index 19e626ae6c6..e3fdd932cf7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java @@ -774,7 +774,7 @@ public class TestAMWebServicesTasks extends JerseyTest { assertTrue("name not set", (name != null && !name.isEmpty())); JSONArray counters = counterGroup.getJSONArray("counter"); for (int j = 0; j < counters.length(); j++) { - JSONObject counter = counters.getJSONObject(i); + JSONObject counter = counters.getJSONObject(j); String counterName = counter.getString("name"); assertTrue("name not set", (counterName != null && !counterName.isEmpty())); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index d3ebee62b2a..ebdb4160ee8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.TaskCompletionEvent; @@ -45,14 +44,15 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +@SuppressWarnings("deprecation") public class TypeConverter { private static RecordFactory recordFactory; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java index 8b2ba0d0197..e85805c8d8a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java @@ -18,15 +18,12 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; -import org.apache.avro.util.Utf8; - /** * Event to record successful completion of job * @@ -34,7 +31,18 @@ import org.apache.avro.util.Utf8; @InterfaceAudience.Private @InterfaceStability.Unstable public class JobFinishedEvent implements HistoryEvent { - private JobFinished datum = new JobFinished(); + + private JobFinished datum = null; + + private JobID jobId; + private long finishTime; + private int finishedMaps; + private int finishedReduces; + private int failedMaps; + private int failedReduces; + private Counters mapCounters; + private Counters reduceCounters; + private Counters totalCounters; /** * Create an event to record successful job completion @@ -53,50 +61,75 @@ public class JobFinishedEvent implements HistoryEvent { int failedMaps, int failedReduces, Counters mapCounters, Counters reduceCounters, Counters totalCounters) { - datum.jobid = new Utf8(id.toString()); - datum.finishTime = finishTime; - datum.finishedMaps = finishedMaps; - datum.finishedReduces = finishedReduces; - datum.failedMaps = failedMaps; - datum.failedReduces = failedReduces; - datum.mapCounters = - EventWriter.toAvro(mapCounters, "MAP_COUNTERS"); - datum.reduceCounters = - EventWriter.toAvro(reduceCounters, "REDUCE_COUNTERS"); - datum.totalCounters = - EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS"); + this.jobId = id; + this.finishTime = finishTime; + this.finishedMaps = finishedMaps; + this.finishedReduces = finishedReduces; + this.failedMaps = failedMaps; + this.failedReduces = failedReduces; + this.mapCounters = mapCounters; + this.reduceCounters = reduceCounters; + this.totalCounters = totalCounters; } JobFinishedEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { this.datum = (JobFinished)datum; } + public Object getDatum() { + if (datum == null) { + datum = new JobFinished(); + datum.jobid = new Utf8(jobId.toString()); + datum.finishTime = finishTime; + datum.finishedMaps = finishedMaps; + datum.finishedReduces = finishedReduces; + datum.failedMaps = failedMaps; + datum.failedReduces = failedReduces; + datum.mapCounters = EventWriter.toAvro(mapCounters, "MAP_COUNTERS"); + datum.reduceCounters = EventWriter.toAvro(reduceCounters, + "REDUCE_COUNTERS"); + datum.totalCounters = EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS"); + } + return datum; + } + + public void setDatum(Object oDatum) { + this.datum = (JobFinished) oDatum; + this.jobId = JobID.forName(datum.jobid.toString()); + this.finishTime = datum.finishTime; + this.finishedMaps = datum.finishedMaps; + this.finishedReduces = datum.finishedReduces; + this.failedMaps = datum.failedMaps; + this.failedReduces = datum.failedReduces; + this.mapCounters = EventReader.fromAvro(datum.mapCounters); + this.reduceCounters = EventReader.fromAvro(datum.reduceCounters); + this.totalCounters = EventReader.fromAvro(datum.totalCounters); + } + public EventType getEventType() { return EventType.JOB_FINISHED; } /** Get the Job ID */ - public JobID getJobid() { return JobID.forName(datum.jobid.toString()); } + public JobID getJobid() { return jobId; } /** Get the job finish time */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return finishTime; } /** Get the number of finished maps for the job */ - public int getFinishedMaps() { return datum.finishedMaps; } + public int getFinishedMaps() { return finishedMaps; } /** Get the number of finished reducers for the job */ - public int getFinishedReduces() { return datum.finishedReduces; } + public int getFinishedReduces() { return finishedReduces; } /** Get the number of failed maps for the job */ - public int getFailedMaps() { return datum.failedMaps; } + public int getFailedMaps() { return failedMaps; } /** Get the number of failed reducers for the job */ - public int getFailedReduces() { return datum.failedReduces; } + public int getFailedReduces() { return failedReduces; } /** Get the counters for the job */ public Counters getTotalCounters() { - return EventReader.fromAvro(datum.totalCounters); + return totalCounters; } /** Get the Map counters for the job */ public Counters getMapCounters() { - return EventReader.fromAvro(datum.mapCounters); + return mapCounters; } /** Get the reduce counters for the job */ public Counters getReduceCounters() { - return EventReader.fromAvro(datum.reduceCounters); + return reduceCounters; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java index 5838c971f37..d4f9fa316b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java @@ -34,8 +34,25 @@ import org.apache.hadoop.mapreduce.TaskType; @InterfaceAudience.Private @InterfaceStability.Unstable public class MapAttemptFinishedEvent implements HistoryEvent { - private MapAttemptFinished datum = new MapAttemptFinished(); - + + private MapAttemptFinished datum = null; + + private TaskAttemptID attemptId; + private TaskType taskType; + private String taskStatus; + private long finishTime; + private String hostname; + private String rackName; + private int port; + private long mapFinishTime; + private String state; + private Counters counters; + int[][] allSplits; + int[] clockSplits; + int[] cpuUsages; + int[] vMemKbytes; + int[] physMemKbytes; + /** * Create an event for successful completion of map attempts * @param id Task Attempt ID @@ -60,33 +77,21 @@ public class MapAttemptFinishedEvent implements HistoryEvent { (TaskAttemptID id, TaskType taskType, String taskStatus, long mapFinishTime, long finishTime, String hostname, int port, String rackName, String state, Counters counters, int[][] allSplits) { - datum.taskid = new Utf8(id.getTaskID().toString()); - datum.attemptId = new Utf8(id.toString()); - datum.taskType = new Utf8(taskType.name()); - datum.taskStatus = new Utf8(taskStatus); - datum.mapFinishTime = mapFinishTime; - datum.finishTime = finishTime; - datum.hostname = new Utf8(hostname); - datum.port = port; - // This is needed for reading old jh files - if (rackName != null) { - datum.rackname = new Utf8(rackName); - } - datum.state = new Utf8(state); - datum.counters = EventWriter.toAvro(counters); - - datum.clockSplits - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetWallclockTime(allSplits)); - datum.cpuUsages - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetCPUTime(allSplits)); - datum.vMemKbytes - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits)); - datum.physMemKbytes - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits)); + this.attemptId = id; + this.taskType = taskType; + this.taskStatus = taskStatus; + this.mapFinishTime = mapFinishTime; + this.finishTime = finishTime; + this.hostname = hostname; + this.rackName = rackName; + this.port = port; + this.state = state; + this.counters = counters; + this.allSplits = allSplits; + this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits); + this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); + this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); + this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); } /** @@ -117,57 +122,100 @@ public class MapAttemptFinishedEvent implements HistoryEvent { MapAttemptFinishedEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { - this.datum = (MapAttemptFinished)datum; + public Object getDatum() { + if (datum == null) { + datum = new MapAttemptFinished(); + datum.taskid = new Utf8(attemptId.getTaskID().toString()); + datum.attemptId = new Utf8(attemptId.toString()); + datum.taskType = new Utf8(taskType.name()); + datum.taskStatus = new Utf8(taskStatus); + datum.mapFinishTime = mapFinishTime; + datum.finishTime = finishTime; + datum.hostname = new Utf8(hostname); + datum.port = port; + if (rackName != null) { + datum.rackname = new Utf8(rackName); + } + datum.state = new Utf8(state); + datum.counters = EventWriter.toAvro(counters); + + datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetWallclockTime(allSplits)); + datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetCPUTime(allSplits)); + datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetVMemKbytes(allSplits)); + datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetPhysMemKbytes(allSplits)); + } + return datum; + } + + public void setDatum(Object oDatum) { + this.datum = (MapAttemptFinished)oDatum; + this.attemptId = TaskAttemptID.forName(datum.attemptId.toString()); + this.taskType = TaskType.valueOf(datum.taskType.toString()); + this.taskStatus = datum.taskStatus.toString(); + this.mapFinishTime = datum.mapFinishTime; + this.finishTime = datum.finishTime; + this.hostname = datum.hostname.toString(); + this.rackName = datum.rackname.toString(); + this.port = datum.port; + this.state = datum.state.toString(); + this.counters = EventReader.fromAvro(datum.counters); + this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits); + this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages); + this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes); + this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes); } /** Get the task ID */ - public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } + public TaskID getTaskId() { return attemptId.getTaskID(); } /** Get the attempt id */ public TaskAttemptID getAttemptId() { - return TaskAttemptID.forName(datum.attemptId.toString()); + return attemptId; } + /** Get the task type */ public TaskType getTaskType() { - return TaskType.valueOf(datum.taskType.toString()); + return TaskType.valueOf(taskType.toString()); } /** Get the task status */ - public String getTaskStatus() { return datum.taskStatus.toString(); } + public String getTaskStatus() { return taskStatus.toString(); } /** Get the map phase finish time */ - public long getMapFinishTime() { return datum.mapFinishTime; } + public long getMapFinishTime() { return mapFinishTime; } /** Get the attempt finish time */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return finishTime; } /** Get the host name */ - public String getHostname() { return datum.hostname.toString(); } + public String getHostname() { return hostname.toString(); } /** Get the tracker rpc port */ - public int getPort() { return datum.port; } + public int getPort() { return port; } /** Get the rack name */ public String getRackName() { - return datum.rackname == null ? null : datum.rackname.toString(); + return rackName == null ? null : rackName.toString(); } /** Get the state string */ - public String getState() { return datum.state.toString(); } + public String getState() { return state.toString(); } /** Get the counters */ - Counters getCounters() { return EventReader.fromAvro(datum.counters); } + Counters getCounters() { return counters; } /** Get the event type */ public EventType getEventType() { return EventType.MAP_ATTEMPT_FINISHED; } public int[] getClockSplits() { - return AvroArrayUtils.fromAvro(datum.clockSplits); + return clockSplits; } public int[] getCpuUsages() { - return AvroArrayUtils.fromAvro(datum.cpuUsages); + return cpuUsages; } public int[] getVMemKbytes() { - return AvroArrayUtils.fromAvro(datum.vMemKbytes); + return vMemKbytes; } public int[] getPhysMemKbytes() { - return AvroArrayUtils.fromAvro(datum.physMemKbytes); + return physMemKbytes; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index 99b2212316c..10b8c1f0139 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -34,8 +34,25 @@ import org.apache.hadoop.mapreduce.TaskType; @InterfaceAudience.Private @InterfaceStability.Unstable public class ReduceAttemptFinishedEvent implements HistoryEvent { - private ReduceAttemptFinished datum = - new ReduceAttemptFinished(); + + private ReduceAttemptFinished datum = null; + + private TaskAttemptID attemptId; + private TaskType taskType; + private String taskStatus; + private long shuffleFinishTime; + private long sortFinishTime; + private long finishTime; + private String hostname; + private String rackName; + private int port; + private String state; + private Counters counters; + int[][] allSplits; + int[] clockSplits; + int[] cpuUsages; + int[] vMemKbytes; + int[] physMemKbytes; /** * Create an event to record completion of a reduce attempt @@ -60,33 +77,22 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { long shuffleFinishTime, long sortFinishTime, long finishTime, String hostname, int port, String rackName, String state, Counters counters, int[][] allSplits) { - datum.taskid = new Utf8(id.getTaskID().toString()); - datum.attemptId = new Utf8(id.toString()); - datum.taskType = new Utf8(taskType.name()); - datum.taskStatus = new Utf8(taskStatus); - datum.shuffleFinishTime = shuffleFinishTime; - datum.sortFinishTime = sortFinishTime; - datum.finishTime = finishTime; - datum.hostname = new Utf8(hostname); - datum.port = port; - if (rackName != null) { - datum.rackname = new Utf8(rackName); - } - datum.state = new Utf8(state); - datum.counters = EventWriter.toAvro(counters); - - datum.clockSplits - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetWallclockTime(allSplits)); - datum.cpuUsages - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetCPUTime(allSplits)); - datum.vMemKbytes - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits)); - datum.physMemKbytes - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits)); + this.attemptId = id; + this.taskType = taskType; + this.taskStatus = taskStatus; + this.shuffleFinishTime = shuffleFinishTime; + this.sortFinishTime = sortFinishTime; + this.finishTime = finishTime; + this.hostname = hostname; + this.rackName = rackName; + this.port = port; + this.state = state; + this.counters = counters; + this.allSplits = allSplits; + this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits); + this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); + this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); + this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); } /** @@ -117,43 +123,87 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { ReduceAttemptFinishedEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { - this.datum = (ReduceAttemptFinished)datum; + public Object getDatum() { + if (datum == null) { + datum = new ReduceAttemptFinished(); + datum.taskid = new Utf8(attemptId.getTaskID().toString()); + datum.attemptId = new Utf8(attemptId.toString()); + datum.taskType = new Utf8(taskType.name()); + datum.taskStatus = new Utf8(taskStatus); + datum.shuffleFinishTime = shuffleFinishTime; + datum.sortFinishTime = sortFinishTime; + datum.finishTime = finishTime; + datum.hostname = new Utf8(hostname); + datum.port = port; + if (rackName != null) { + datum.rackname = new Utf8(rackName); + } + datum.state = new Utf8(state); + datum.counters = EventWriter.toAvro(counters); + + datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetWallclockTime(allSplits)); + datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetCPUTime(allSplits)); + datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetVMemKbytes(allSplits)); + datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetPhysMemKbytes(allSplits)); + } + return datum; + } + + public void setDatum(Object oDatum) { + this.datum = (ReduceAttemptFinished)oDatum; + this.attemptId = TaskAttemptID.forName(datum.attemptId.toString()); + this.taskType = TaskType.valueOf(datum.taskType.toString()); + this.taskStatus = datum.taskStatus.toString(); + this.shuffleFinishTime = datum.shuffleFinishTime; + this.sortFinishTime = datum.sortFinishTime; + this.finishTime = datum.finishTime; + this.hostname = datum.hostname.toString(); + this.rackName = datum.rackname.toString(); + this.port = datum.port; + this.state = datum.state.toString(); + this.counters = EventReader.fromAvro(datum.counters); + this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits); + this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages); + this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes); + this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes); } /** Get the Task ID */ - public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } + public TaskID getTaskId() { return attemptId.getTaskID(); } /** Get the attempt id */ public TaskAttemptID getAttemptId() { - return TaskAttemptID.forName(datum.attemptId.toString()); + return TaskAttemptID.forName(attemptId.toString()); } /** Get the task type */ public TaskType getTaskType() { - return TaskType.valueOf(datum.taskType.toString()); + return TaskType.valueOf(taskType.toString()); } /** Get the task status */ - public String getTaskStatus() { return datum.taskStatus.toString(); } + public String getTaskStatus() { return taskStatus.toString(); } /** Get the finish time of the sort phase */ - public long getSortFinishTime() { return datum.sortFinishTime; } + public long getSortFinishTime() { return sortFinishTime; } /** Get the finish time of the shuffle phase */ - public long getShuffleFinishTime() { return datum.shuffleFinishTime; } + public long getShuffleFinishTime() { return shuffleFinishTime; } /** Get the finish time of the attempt */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return finishTime; } /** Get the name of the host where the attempt ran */ - public String getHostname() { return datum.hostname.toString(); } + public String getHostname() { return hostname.toString(); } /** Get the tracker rpc port */ - public int getPort() { return datum.port; } + public int getPort() { return port; } /** Get the rack name of the node where the attempt ran */ public String getRackName() { - return datum.rackname == null ? null : datum.rackname.toString(); + return rackName == null ? null : rackName.toString(); } /** Get the state string */ - public String getState() { return datum.state.toString(); } + public String getState() { return state.toString(); } /** Get the counters for the attempt */ - Counters getCounters() { return EventReader.fromAvro(datum.counters); } + Counters getCounters() { return counters; } /** Get the event type */ public EventType getEventType() { return EventType.REDUCE_ATTEMPT_FINISHED; @@ -161,16 +211,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { public int[] getClockSplits() { - return AvroArrayUtils.fromAvro(datum.clockSplits); + return clockSplits; } public int[] getCpuUsages() { - return AvroArrayUtils.fromAvro(datum.cpuUsages); + return cpuUsages; } public int[] getVMemKbytes() { - return AvroArrayUtils.fromAvro(datum.vMemKbytes); + return vMemKbytes; } public int[] getPhysMemKbytes() { - return AvroArrayUtils.fromAvro(datum.physMemKbytes); + return physMemKbytes; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java index 9938182fc6c..a62ca38e4a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java @@ -18,8 +18,7 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; @@ -27,8 +26,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.avro.util.Utf8; - /** * Event to record successful task completion * @@ -36,7 +33,17 @@ import org.apache.avro.util.Utf8; @InterfaceAudience.Private @InterfaceStability.Unstable public class TaskAttemptFinishedEvent implements HistoryEvent { - private TaskAttemptFinished datum = new TaskAttemptFinished(); + + private TaskAttemptFinished datum = null; + + private TaskAttemptID attemptId; + private TaskType taskType; + private String taskStatus; + private long finishTime; + private String rackName; + private String hostname; + private String state; + private Counters counters; /** * Create an event to record successful finishes for setup and cleanup @@ -53,52 +60,73 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { TaskType taskType, String taskStatus, long finishTime, String rackName, String hostname, String state, Counters counters) { - datum.taskid = new Utf8(id.getTaskID().toString()); - datum.attemptId = new Utf8(id.toString()); - datum.taskType = new Utf8(taskType.name()); - datum.taskStatus = new Utf8(taskStatus); - datum.finishTime = finishTime; - if (rackName != null) { - datum.rackname = new Utf8(rackName); - } - datum.hostname = new Utf8(hostname); - datum.state = new Utf8(state); - datum.counters = EventWriter.toAvro(counters); + this.attemptId = id; + this.taskType = taskType; + this.taskStatus = taskStatus; + this.finishTime = finishTime; + this.rackName = rackName; + this.hostname = hostname; + this.state = state; + this.counters = counters; } TaskAttemptFinishedEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { - this.datum = (TaskAttemptFinished)datum; + public Object getDatum() { + if (datum == null) { + datum = new TaskAttemptFinished(); + datum.taskid = new Utf8(attemptId.getTaskID().toString()); + datum.attemptId = new Utf8(attemptId.toString()); + datum.taskType = new Utf8(taskType.name()); + datum.taskStatus = new Utf8(taskStatus); + datum.finishTime = finishTime; + if (rackName != null) { + datum.rackname = new Utf8(rackName); + } + datum.hostname = new Utf8(hostname); + datum.state = new Utf8(state); + datum.counters = EventWriter.toAvro(counters); + } + return datum; + } + public void setDatum(Object oDatum) { + this.datum = (TaskAttemptFinished)oDatum; + this.attemptId = TaskAttemptID.forName(datum.attemptId.toString()); + this.taskType = TaskType.valueOf(datum.taskType.toString()); + this.taskStatus = datum.taskStatus.toString(); + this.finishTime = datum.finishTime; + this.rackName = datum.rackname.toString(); + this.hostname = datum.hostname.toString(); + this.state = datum.state.toString(); + this.counters = EventReader.fromAvro(datum.counters); } /** Get the task ID */ - public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } + public TaskID getTaskId() { return attemptId.getTaskID(); } /** Get the task attempt id */ public TaskAttemptID getAttemptId() { - return TaskAttemptID.forName(datum.attemptId.toString()); + return TaskAttemptID.forName(attemptId.toString()); } /** Get the task type */ public TaskType getTaskType() { - return TaskType.valueOf(datum.taskType.toString()); + return TaskType.valueOf(taskType.toString()); } /** Get the task status */ - public String getTaskStatus() { return datum.taskStatus.toString(); } + public String getTaskStatus() { return taskStatus.toString(); } /** Get the attempt finish time */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return finishTime; } /** Get the host where the attempt executed */ - public String getHostname() { return datum.hostname.toString(); } + public String getHostname() { return hostname.toString(); } /** Get the rackname where the attempt executed */ public String getRackName() { - return datum.rackname == null ? null : datum.rackname.toString(); + return rackName == null ? null : rackName.toString(); } /** Get the state string */ - public String getState() { return datum.state.toString(); } + public String getState() { return state.toString(); } /** Get the counters for the attempt */ - Counters getCounters() { return EventReader.fromAvro(datum.counters); } + Counters getCounters() { return counters; } /** Get the event type */ public EventType getEventType() { // Note that the task type can be setup/map/reduce/cleanup but the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index d5e006b776a..35399709bfa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -18,16 +18,13 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.avro.util.Utf8; - /** * Event to record the successful completion of a task * @@ -35,7 +32,14 @@ import org.apache.avro.util.Utf8; @InterfaceAudience.Private @InterfaceStability.Unstable public class TaskFinishedEvent implements HistoryEvent { - private TaskFinished datum = new TaskFinished(); + + private TaskFinished datum = null; + + private TaskID taskid; + private long finishTime; + private TaskType taskType; + private String status; + private Counters counters; /** * Create an event to record the successful completion of a task @@ -48,32 +52,48 @@ public class TaskFinishedEvent implements HistoryEvent { public TaskFinishedEvent(TaskID id, long finishTime, TaskType taskType, String status, Counters counters) { - datum.taskid = new Utf8(id.toString()); - datum.finishTime = finishTime; - datum.counters = EventWriter.toAvro(counters); - datum.taskType = new Utf8(taskType.name()); - datum.status = new Utf8(status); + this.taskid = id; + this.finishTime = finishTime; + this.taskType = taskType; + this.status = status; + this.counters = counters; } TaskFinishedEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { - this.datum = (TaskFinished)datum; + public Object getDatum() { + if (datum == null) { + datum = new TaskFinished(); + datum.taskid = new Utf8(taskid.toString()); + datum.finishTime = finishTime; + datum.counters = EventWriter.toAvro(counters); + datum.taskType = new Utf8(taskType.name()); + datum.status = new Utf8(status); + } + return datum; + } + + public void setDatum(Object oDatum) { + this.datum = (TaskFinished)oDatum; + this.taskid = TaskID.forName(datum.taskid.toString()); + this.finishTime = datum.finishTime; + this.taskType = TaskType.valueOf(datum.taskType.toString()); + this.status = datum.status.toString(); + this.counters = EventReader.fromAvro(datum.counters); } /** Get task id */ - public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } + public TaskID getTaskId() { return TaskID.forName(taskid.toString()); } /** Get the task finish time */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return finishTime; } /** Get task counters */ - public Counters getCounters() { return EventReader.fromAvro(datum.counters); } + public Counters getCounters() { return counters; } /** Get task type */ public TaskType getTaskType() { - return TaskType.valueOf(datum.taskType.toString()); + return TaskType.valueOf(taskType.toString()); } /** Get task status */ - public String getTaskStatus() { return datum.status.toString(); } + public String getTaskStatus() { return status.toString(); } /** Get event type */ public EventType getEventType() { return EventType.TASK_FINISHED; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index fd828d76693..c3afb013c5e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -32,13 +32,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobACLsManager; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -89,7 +89,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job loadFullHistoryData(loadTasks, historyFile); user = userName; - counters = TypeConverter.toYarn(jobInfo.getTotalCounters()); + counters = jobInfo.getTotalCounters(); diagnostics.add(jobInfo.getErrorInfo()); report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance( @@ -121,7 +121,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job } @Override - public Counters getCounters() { + public Counters getAllCounters() { return counters; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java index b02f6c59d9c..ced2ddb9375 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java @@ -24,10 +24,10 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; @@ -60,7 +60,7 @@ public class CompletedTask implements Task { this.finishTime = taskInfo.getFinishTime(); this.type = TypeConverter.toYarn(taskInfo.getTaskType()); if (taskInfo.getCounters() != null) - this.counters = TypeConverter.toYarn(taskInfo.getCounters()); + this.counters = taskInfo.getCounters(); if (taskInfo.getTaskStatus() != null) { this.state = TaskState.valueOf(taskInfo.getTaskStatus()); } else { @@ -86,7 +86,7 @@ public class CompletedTask implements Task { report.setFinishTime(finishTime); report.setTaskState(state); report.setProgress(getProgress()); - report.setCounters(getCounters()); + report.setCounters(TypeConverter.toYarn(getCounters())); report.addAllRunningAttempts(new ArrayList(attempts.keySet())); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index fd3ea174b32..09819c39228 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -21,9 +21,9 @@ package org.apache.hadoop.mapreduce.v2.hs; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -46,8 +46,9 @@ public class CompletedTaskAttempt implements TaskAttempt { CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) { this.attemptInfo = attemptInfo; this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId()); - if (attemptInfo.getCounters() != null) - this.counters = TypeConverter.toYarn(attemptInfo.getCounters()); + if (attemptInfo.getCounters() != null) { + this.counters = attemptInfo.getCounters(); + } if (attemptInfo.getTaskStatus() != null) { this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus()); } else { @@ -61,7 +62,6 @@ public class CompletedTaskAttempt implements TaskAttempt { } report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class); - report.setCounters(counters); report.setTaskAttemptId(attemptId); report.setTaskAttemptState(state); @@ -78,7 +78,7 @@ public class CompletedTaskAttempt implements TaskAttempt { } // report.setPhase(attemptInfo.get); //TODO report.setStateString(attemptInfo.getState()); - report.setCounters(getCounters()); + report.setCounters(TypeConverter.toYarn(getCounters())); report.setContainerId(attemptInfo.getContainerId()); if (attemptInfo.getHostname() == null) { report.setNodeManagerHost("UNKNOWN"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java index c4af33ad58b..b2142f90d12 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; @@ -190,7 +191,7 @@ public class HistoryClientService extends AbstractService { JobId jobId = request.getJobId(); Job job = verifyAndGetJob(jobId); GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class); - response.setCounters(job.getCounters()); + response.setCounters(TypeConverter.toYarn(job.getAllCounters())); return response; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index 1b1b3b1cedf..dac0808e576 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -22,9 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -95,7 +95,7 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { } @Override - public Counters getCounters() { + public Counters getAllCounters() { return null; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java index 4cc0b3259f5..6fdb94d9029 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java @@ -699,7 +699,7 @@ public class TestHsWebServicesAttempts extends JerseyTest { assertTrue("name not set", (name != null && !name.isEmpty())); JSONArray counters = counterGroup.getJSONArray("counter"); for (int j = 0; j < counters.length(); j++) { - JSONObject counter = counters.getJSONObject(i); + JSONObject counter = counters.getJSONObject(j); String counterName = counter.getString("name"); assertTrue("name not set", (counterName != null && !counterName.isEmpty())); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java index 3dbe860c182..b0780aff5a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java @@ -788,7 +788,7 @@ public class TestHsWebServicesTasks extends JerseyTest { assertTrue("name not set", (name != null && !name.isEmpty())); JSONArray counters = counterGroup.getJSONArray("counter"); for (int j = 0; j < counters.length(); j++) { - JSONObject counter = counters.getJSONObject(i); + JSONObject counter = counters.getJSONObject(j); String counterName = counter.getString("name"); assertTrue("name not set", (counterName != null && !counterName.isEmpty()));