An easy-to-understand example of this kind of quantity would be + * a distance traveled. It makes sense to consider that portion of + * the total travel that can be apportioned to each bucket. + * + */ +class CumulativePeriodicStats extends PeriodicStatsAccumulator { + // int's are acceptable here, even though times are normally + // long's, because these are a difference and an int won't + // overflow for 24 days. Tasks can't run for more than about a + // week for other reasons, and most jobs would be written + int previousValue = 0; + + CumulativePeriodicStats(int count) { + super(count); + } + + /** + * + * accumulates a new reading by keeping a running account of the + * value distance from the beginning of the bucket to the end of + * this reading + */ + @Override + protected void extendInternal(double newProgress, int newValue) { + if (state == null) { + return; + } + + state.currentAccumulation += (double)(newValue - previousValue); + previousValue = newValue; + } +} diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java index 51b33292842..70a7ca6fd04 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java @@ -2673,25 +2673,29 @@ public class JobInProgress { status.getTaskTracker(), ttStatus.getHttpPort()); jobHistory.logEvent(tse, status.getTaskID().getJobID()); - + TaskAttemptID statusAttemptID = status.getTaskID(); if (status.getIsMap()){ MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent( - status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(), + statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), status.getMapFinishTime(), status.getFinishTime(), trackerHostname, status.getStateString(), - new org.apache.hadoop.mapreduce.Counters(status.getCounters())); + new org.apache.hadoop.mapreduce.Counters(status.getCounters()), + tip.getSplits(statusAttemptID).burst() + ); jobHistory.logEvent(mfe, status.getTaskID().getJobID()); }else{ ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent( - status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(), + statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), status.getShuffleFinishTime(), status.getSortFinishTime(), status.getFinishTime(), trackerHostname, status.getStateString(), - new org.apache.hadoop.mapreduce.Counters(status.getCounters())); + new org.apache.hadoop.mapreduce.Counters(status.getCounters()), + tip.getSplits(statusAttemptID).burst() + ); jobHistory.logEvent(rfe, status.getTaskID().getJobID()); @@ -3171,12 +3175,16 @@ public class JobInProgress { taskid, taskType, startTime, taskTrackerName, taskTrackerPort); jobHistory.logEvent(tse, taskid.getJobID()); + + ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID()); - TaskAttemptUnsuccessfulCompletionEvent tue = - new TaskAttemptUnsuccessfulCompletionEvent(taskid, - taskType, taskStatus.getRunState().toString(), - finishTime, - taskTrackerHostName, diagInfo); + TaskAttemptUnsuccessfulCompletionEvent tue = + new TaskAttemptUnsuccessfulCompletionEvent + (taskid, + taskType, taskStatus.getRunState().toString(), + finishTime, + taskTrackerHostName, diagInfo, + splits.burst()); jobHistory.logEvent(tue, taskid.getJobID()); // After this, try to assign tasks with the one after this, so that diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java b/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java new file mode 100644 index 00000000000..85ee8a544e7 --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +/** + * + * This abstract class that represents a bucketed series of + * measurements of a quantity being measured in a running task + * attempt. + * + *
The sole constructor is called with a count, which is the + * number of buckets into which we evenly divide the spectrum of + * progress from 0.0D to 1.0D . In the future we may provide for + * custom split points that don't have to be uniform. + * + *
A subclass determines how we fold readings for portions of a + * bucket and how we interpret the readings by overriding + * {@code extendInternal(...)} and {@code initializeInterval()} + */ +public abstract class PeriodicStatsAccumulator { + // The range of progress from 0.0D through 1.0D is divided into + // count "progress segments". This object accumulates an + // estimate of the effective value of a time-varying value during + // the zero-based i'th progress segment, ranging from i/count + // through (i+1)/count . + // This is an abstract class. We have two implementations: one + // for monotonically increasing time-dependent variables + // [currently, CPU time in milliseconds and wallclock time in + // milliseconds] and one for quantities that can vary arbitrarily + // over time, currently virtual and physical memory used, in + // kilobytes. + // We carry int's here. This saves a lot of JVM heap space in the + // job tracker per running task attempt [200 bytes per] but it + // has a small downside. + // No task attempt can run for more than 57 days nor occupy more + // than two terabytes of virtual memory. + protected final int count; + protected final int[] values; + + static class StatsetState { + int oldValue = 0; + double oldProgress = 0.0D; + + double currentAccumulation = 0.0D; + } + + // We provide this level of indirection to reduce the memory + // footprint of done task attempts. When a task's progress + // reaches 1.0D, we delete this objecte StatsetState. + StatsetState state = new StatsetState(); + + PeriodicStatsAccumulator(int count) { + this.count = count; + this.values = new int[count]; + for (int i = 0; i < count; ++i) { + values[i] = -1; + } + } + + protected int[] getValues() { + return values; + } + + // The concrete implementation of this abstract function + // accumulates more data into the current progress segment. + // newProgress [from the call] and oldProgress [from the object] + // must be in [or at the border of] a single progress segment. + /** + * + * adds a new reading to the current bucket. + * + * @param newProgress the endpoint of the interval this new + * reading covers + * @param newValue the value of the reading at {@code newProgress} + * + * The class has three instance variables, {@code oldProgress} and + * {@code oldValue} and {@code currentAccumulation}. + * + * {@code extendInternal} can count on three things: + * + * 1: The first time it's called in a particular instance, both + * oldXXX's will be zero. + * + * 2: oldXXX for a later call is the value of newXXX of the + * previous call. This ensures continuity in accumulation from + * one call to the next. + * + * 3: {@code currentAccumulation} is owned by + * {@code initializeInterval} and {@code extendInternal}. + */ + protected abstract void extendInternal(double newProgress, int newValue); + + // What has to be done when you open a new interval + /** + * initializes the state variables to be ready for a new interval + */ + protected void initializeInterval() { + state.currentAccumulation = 0.0D; + } + + // called for each new reading + /** + * This method calls {@code extendInternal} at least once. It + * divides the current progress interval [from the last call's + * {@code newProgress} to this call's {@code newProgress} ] + * into one or more subintervals by splitting at any point which + * is an interval boundary if there are any such points. It + * then calls {@code extendInternal} for each subinterval, or the + * whole interval if there are no splitting points. + * + *
For example, if the value was {@code 300} last time with + * {@code 0.3} progress, and count is {@code 5}, and you get a + * new reading with the variable at {@code 700} and progress at + * {@code 0.7}, you get three calls to {@code extendInternal}: + * one extending from progress {@code 0.3} to {@code 0.4} [the + * next boundary] with a value of {@code 400}, the next one + * through {@code 0.6} with a value of {@code 600}, and finally + * one at {@code 700} with a progress of {@code 0.7} . + * + * @param newProgress the endpoint of the progress range this new + * reading covers + * @param newValue the value of the reading at {@code newProgress} + */ + protected void extend(double newProgress, int newValue) { + if (state == null || newProgress < state.oldProgress) { + return; + } + + // This correctness of this code depends on 100% * count = count. + int oldIndex = (int)(state.oldProgress * count); + int newIndex = (int)(newProgress * count); + int originalOldValue = state.oldValue; + + double fullValueDistance = (double)newValue - state.oldValue; + double fullProgressDistance = newProgress - state.oldProgress; + double originalOldProgress = state.oldProgress; + + // In this loop we detect each subinterval boundary within the + // range from the old progress to the new one. Then we + // interpolate the value from the old value to the new one to + // infer what its value might have been at each such boundary. + // Lastly we make the necessary calls to extendInternal to fold + // in the data for each trapazoid where no such trapazoid + // crosses a boundary. + for (int closee = oldIndex; closee < newIndex; ++closee) { + double interpolationProgress = (double)(closee + 1) / count; + // In floats, x * y / y might not equal y. + interpolationProgress = Math.min(interpolationProgress, newProgress); + + double progressLength = (interpolationProgress - originalOldProgress); + double interpolationProportion = progressLength / fullProgressDistance; + + double interpolationValueDistance + = fullValueDistance * interpolationProportion; + + // estimates the value at the next [interpolated] subsegment boundary + int interpolationValue + = (int)interpolationValueDistance + originalOldValue; + + extendInternal(interpolationProgress, interpolationValue); + + advanceState(interpolationProgress, interpolationValue); + + values[closee] = (int)state.currentAccumulation; + initializeInterval(); + + } + + extendInternal(newProgress, newValue); + advanceState(newProgress, newValue); + + if (newIndex == count) { + state = null; + } + } + + protected void advanceState(double newProgress, int newValue) { + state.oldValue = newValue; + state.oldProgress = newProgress; + } + + int getCount() { + return count; + } + + int get(int index) { + return values[index]; + } +} diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java b/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java new file mode 100644 index 00000000000..d3912438527 --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.util.List; + +/* + * This object gathers the [currently four] PeriodStatset's that we + * are gathering for a particular task attempt for packaging and + * handling as a single object. + */ +public class ProgressSplitsBlock { + final PeriodicStatsAccumulator progressWallclockTime; + final PeriodicStatsAccumulator progressCPUTime; + final PeriodicStatsAccumulator progressVirtualMemoryKbytes; + final PeriodicStatsAccumulator progressPhysicalMemoryKbytes; + + static final int[] NULL_ARRAY = new int[0]; + + static final int WALLCLOCK_TIME_INDEX = 0; + static final int CPU_TIME_INDEX = 1; + static final int VIRTUAL_MEMORY_KBYTES_INDEX = 2; + static final int PHYSICAL_MEMORY_KBYTES_INDEX = 3; + + static final int DEFAULT_NUMBER_PROGRESS_SPLITS = 12; + + ProgressSplitsBlock(int numberSplits) { + progressWallclockTime + = new CumulativePeriodicStats(numberSplits); + progressCPUTime + = new CumulativePeriodicStats(numberSplits); + progressVirtualMemoryKbytes + = new StatePeriodicStats(numberSplits); + progressPhysicalMemoryKbytes + = new StatePeriodicStats(numberSplits); + } + + // this coordinates with LoggedTaskAttempt.SplitVectorKind + int[][] burst() { + int[][] result = new int[4][]; + + result[WALLCLOCK_TIME_INDEX] = progressWallclockTime.getValues(); + result[CPU_TIME_INDEX] = progressCPUTime.getValues(); + result[VIRTUAL_MEMORY_KBYTES_INDEX] = progressVirtualMemoryKbytes.getValues(); + result[PHYSICAL_MEMORY_KBYTES_INDEX] = progressPhysicalMemoryKbytes.getValues(); + + return result; + } + + static public int[] arrayGet(int[][] burstedBlock, int index) { + return burstedBlock == null ? NULL_ARRAY : burstedBlock[index]; + } + + static public int[] arrayGetWallclockTime(int[][] burstedBlock) { + return arrayGet(burstedBlock, WALLCLOCK_TIME_INDEX); + } + + static public int[] arrayGetCPUTime(int[][] burstedBlock) { + return arrayGet(burstedBlock, CPU_TIME_INDEX); + } + + static public int[] arrayGetVMemKbytes(int[][] burstedBlock) { + return arrayGet(burstedBlock, VIRTUAL_MEMORY_KBYTES_INDEX); + } + + static public int[] arrayGetPhysMemKbytes(int[][] burstedBlock) { + return arrayGet(burstedBlock, PHYSICAL_MEMORY_KBYTES_INDEX); + } +} + diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java b/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java new file mode 100644 index 00000000000..e9577b303b2 --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + + +/** + * + * This class is a concrete PeriodicStatsAccumulator that deals with + * measurements where the raw data are a measurement of a + * time-varying quantity. The result in each bucket is the estimate + * of the progress-weighted mean value of that quantity over the + * progress range covered by the bucket. + * + *
An easy-to-understand example of this kind of quantity would be
+ * a temperature. It makes sense to consider the mean temperature
+ * over a progress range.
+ *
+ */
+class StatePeriodicStats extends PeriodicStatsAccumulator {
+ StatePeriodicStats(int count) {
+ super(count);
+ }
+
+ /**
+ *
+ * accumulates a new reading by keeping a running account of the
+ * area under the piecewise linear curve marked by pairs of
+ * {@code newProgress, newValue} .
+ */
+ @Override
+ protected void extendInternal(double newProgress, int newValue) {
+ if (state == null) {
+ return;
+ }
+
+ // the effective height of this trapezoid if rectangularized
+ double mean = ((double)newValue + (double)state.oldValue)/2.0D;
+
+ // conceptually mean * (newProgress - state.oldProgress) / (1 / count)
+ state.currentAccumulation += mean * (newProgress - state.oldProgress) * count;
+ }
+}
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java b/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
index 0a703c90658..11738dcc5fe 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
@@ -31,25 +31,32 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
import org.apache.hadoop.mapred.SortedRanges.Range;
+
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
import org.apache.hadoop.net.Node;
+
/*************************************************************
* TaskInProgress maintains all the info needed for a
* Task in the lifetime of its owning Job. A given Task
* might be speculatively executed or reexecuted, so we
* need a level of indirection above the running-id itself.
*
- * A given TaskInProgress contains multiple taskids,
+ * A given TaskInProgress contains multiple task attempt ids,
* 0 or more of which might be executing at any one time.
- * (That's what allows speculative execution.) A taskid
- * is now *never* recycled. A TIP allocates enough taskids
+ * (That's what allows speculative execution.) A task attempt id
+ * is now *never* recycled. A TIP allocates enough task attempt ids
* to account for all the speculation and failures it will
* ever have to handle. Once those are up, the TIP is dead.
* **************************************************************
@@ -60,6 +67,10 @@ class TaskInProgress {
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+ private static final long MEMORY_SPLITS_RESOLUTION = 1024;
+
+ static final int DEFAULT_STATISTICS_INTERVALS = 12;
+
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
// Defines the TIP
@@ -91,6 +102,10 @@ class TaskInProgress {
private volatile boolean skipping = false;
private boolean jobCleanup = false;
private boolean jobSetup = false;
+
+ private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+ private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+ private static Enum PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -109,12 +124,20 @@ class TaskInProgress {
private JobConf conf;
private Map>) and
+ // set(List
>, List
> .
+ // This makes it easier to add another kind in the future.
+ public enum SplitVectorKind {
+
+ WALLCLOCK_TIME {
+ @Override
+ public List
> NULL_SPLITS_VECTOR
+ = new ArrayList
>();
+
+ static {
+ for (SplitVectorKind kind : SplitVectorKind.values() ) {
+ NULL_SPLITS_VECTOR.add(new ArrayList
> listSplits) {
+ return listSplits.get(this.ordinal());
+ }
+
+ public void set(List
> listSplits, List
> getNullSplitsVector() {
+ return NULL_SPLITS_VECTOR;
+ }
+ }
+
+ /**
+ *
+ * @returns a list of all splits vectors, ordered in enumeral order
+ * within {@link SplitVectorKind} . Do NOT use hard-coded
+ * indices within the return for this with a hard-coded
+ * index to get individual values; use
+ * {@code SplitVectorKind.get(LoggedTaskAttempt)} instead.
+ */
+ public List
> allSplitVectors() {
+ List
> result
+ = new ArrayList
>(SplitVectorKind.values().length);
+
+ for (SplitVectorKind kind : SplitVectorKind.values() ) {
+ result.add(kind.get(this));
+ }
+
+ return result;
+ }
+
static private Set
> allSplits) {
+ super(state, taskInfo,
+ allSplits == null
+ ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+ : allSplits);
this.runtime = runtime;
}
+ /**
+ *
+ * @deprecated please use the constructor with
+ * {@code (state, taskInfo, runtime,
+ * List
> allSplits)}
+ * instead.
+ *
+ * see {@link LoggedTaskAttempt} for an explanation of
+ * {@code allSplits}.
+ *
+ * If there are no known splits, use {@code null}.
+ */
+ @Deprecated
+ public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+ long runtime) {
+ this(state, taskInfo, runtime, null);
+ }
+
@Override
public long getRuntime() {
return getMapRuntime();
diff --git a/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java b/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
index 974475aa548..234a4338406 100644
--- a/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
+++ b/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
-public class ReduceAttempt20LineHistoryEventEmitter extends
- TaskAttempt20LineEventEmitter {
+public class ReduceAttempt20LineHistoryEventEmitter
+ extends TaskAttempt20LineEventEmitter {
static List
> allSplits) {
+ super(state, taskInfo,
+ allSplits == null
+ ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+ : allSplits);
this.shuffleTime = shuffleTime;
this.mergeTime = mergeTime;
this.reduceTime = reduceTime;
}
+
+ /**
+ *
+ * @deprecated please use the constructor with
+ * {@code (state, taskInfo, shuffleTime, mergeTime, reduceTime
+ * List
> allSplits)}
+ * instead.
+ *
+ * see {@link LoggedTaskAttempt} for an explanation of
+ * {@code allSplits}.
+ *
+ * If there are no known splits, use {@code null}.
+ */
+ @Deprecated
+ public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+ long mergeTime, long reduceTime) {
+ this(state, taskInfo, shuffleTime, mergeTime, reduceTime, null);
+ }
+
/**
* Get the runtime for the reduce phase of the reduce task-attempt.
*
@@ -67,5 +91,4 @@ public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
public long getRuntime() {
return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
}
-
}
diff --git a/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java b/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
index 7814c480848..77f35a7ceb5 100644
--- a/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
+++ b/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
@@ -138,9 +138,10 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
- return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
- that.originalTaskType, status, Long.parseLong(finishTime),
- hostName, error);
+ return new TaskAttemptUnsuccessfulCompletionEvent
+ (taskAttemptID,
+ that.originalTaskType, status, Long.parseLong(finishTime),
+ hostName, error, null);
}
return null;
diff --git a/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java b/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
index b8921e92f28..d13974f2ca2 100644
--- a/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
+++ b/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.tools.rumen;
+import java.util.List;
+
import org.apache.hadoop.mapred.TaskStatus.State;
/**
@@ -27,13 +29,22 @@ public abstract class TaskAttemptInfo {
protected final State state;
protected final TaskInfo taskInfo;
- protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+ protected final List
> allSplits;
+
+ protected TaskAttemptInfo
+ (State state, TaskInfo taskInfo, List
> allSplits) {
if (state == State.SUCCEEDED || state == State.FAILED) {
this.state = state;
} else {
throw new IllegalArgumentException("status cannot be " + state);
}
this.taskInfo = taskInfo;
+ this.allSplits = allSplits;
+ }
+
+ protected TaskAttemptInfo
+ (State state, TaskInfo taskInfo) {
+ this(state, taskInfo, LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector());
}
/**
@@ -60,4 +71,8 @@ public abstract class TaskAttemptInfo {
public TaskInfo getTaskInfo() {
return taskInfo;
}
+
+ public List
> allSplitVectors = loggedAttempt.allSplitVectors();
+
State state = convertState(loggedAttempt.getResult());
if (loggedTask.getTaskType() == Values.MAP) {
long taskTime;
@@ -594,7 +598,7 @@ public class ZombieJob implements JobStory {
taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
}
taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
- return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+ return new MapTaskAttemptInfo(state, taskInfo, taskTime, allSplitVectors);
} else if (loggedTask.getTaskType() == Values.REDUCE) {
long startTime = loggedAttempt.getStartTime();
long mergeDone = loggedAttempt.getSortFinished();
@@ -605,7 +609,8 @@ public class ZombieJob implements JobStory {
// haven't seen reduce task with startTime=0 ever. But if this happens,
// make up a reduceTime with no shuffle/merge.
long reduceTime = makeUpReduceRuntime(state);
- return new ReduceTaskAttemptInfo(state, taskInfo, 0, 0, reduceTime);
+ return new ReduceTaskAttemptInfo
+ (state, taskInfo, 0, 0, reduceTime, allSplitVectors);
} else {
if (shuffleDone <= 0) {
shuffleDone = startTime;
@@ -619,7 +624,7 @@ public class ZombieJob implements JobStory {
reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
- mergeTime, reduceTime);
+ mergeTime, reduceTime, allSplitVectors);
}
} else {
throw new IllegalArgumentException("taskType for "
@@ -700,7 +705,8 @@ public class ZombieJob implements JobStory {
runtime = makeUpMapRuntime(state, locality);
runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
taskNumber, taskAttemptNumber).toString());
- TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
+ TaskAttemptInfo tai
+ = new MapTaskAttemptInfo(state, taskInfo, runtime, null);
return tai;
} else if (taskType == TaskType.REDUCE) {
State state = State.SUCCEEDED;
@@ -711,8 +717,8 @@ public class ZombieJob implements JobStory {
// TODO make up state
// state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
reduceTime = makeUpReduceRuntime(state);
- TaskAttemptInfo tai = new ReduceTaskAttemptInfo(state, taskInfo,
- shuffleTime, sortTime, reduceTime);
+ TaskAttemptInfo tai = new ReduceTaskAttemptInfo
+ (state, taskInfo, shuffleTime, sortTime, reduceTime, null);
return tai;
}