MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting for maps. Contributed by Siqi Li
(cherry picked from commit b9edad6403
)
This commit is contained in:
parent
3ee0a5892b
commit
1179a0cb07
|
@ -210,6 +210,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
MAPREDUCE-6115. TestPipeApplication#testSubmitter fails in trunk (Binglin
|
MAPREDUCE-6115. TestPipeApplication#testSubmitter fails in trunk (Binglin
|
||||||
Chang via jlowe)
|
Chang via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
|
||||||
|
for maps (Siqi Li via jlowe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -544,7 +544,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
retryStartTime = 0;
|
retryStartTime = 0;
|
||||||
|
|
||||||
scheduler.copySucceeded(mapId, host, compressedLength,
|
scheduler.copySucceeded(mapId, host, compressedLength,
|
||||||
endTime - startTime, mapOutput);
|
startTime, endTime, mapOutput);
|
||||||
// Note successful shuffle
|
// Note successful shuffle
|
||||||
remaining.remove(mapId);
|
remaining.remove(mapId);
|
||||||
metrics.successFetch();
|
metrics.successFetch();
|
||||||
|
|
|
@ -162,7 +162,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
|
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
|
||||||
mapOutput);
|
mapOutput);
|
||||||
return true; // successful fetch.
|
return true; // successful fetch.
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.text.DecimalFormat;
|
import java.text.DecimalFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -64,6 +65,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
||||||
private static final long INITIAL_PENALTY = 10000;
|
private static final long INITIAL_PENALTY = 10000;
|
||||||
private static final float PENALTY_GROWTH_RATE = 1.3f;
|
private static final float PENALTY_GROWTH_RATE = 1.3f;
|
||||||
private final static int REPORT_FAILURE_LIMIT = 10;
|
private final static int REPORT_FAILURE_LIMIT = 10;
|
||||||
|
private static final float BYTES_PER_MILLIS_TO_MBS = 1000f / 1024 / 1024;
|
||||||
|
|
||||||
private final boolean[] finishedMaps;
|
private final boolean[] finishedMaps;
|
||||||
|
|
||||||
|
@ -92,6 +94,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
||||||
private final long startTime;
|
private final long startTime;
|
||||||
private long lastProgressTime;
|
private long lastProgressTime;
|
||||||
|
|
||||||
|
private final CopyTimeTracker copyTimeTracker;
|
||||||
|
|
||||||
private volatile int maxMapRuntime = 0;
|
private volatile int maxMapRuntime = 0;
|
||||||
private final int maxFailedUniqueFetches;
|
private final int maxFailedUniqueFetches;
|
||||||
private final int maxFetchFailuresBeforeReporting;
|
private final int maxFetchFailuresBeforeReporting;
|
||||||
|
@ -112,7 +116,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
||||||
Counters.Counter failedShuffleCounter) {
|
Counters.Counter failedShuffleCounter) {
|
||||||
totalMaps = job.getNumMapTasks();
|
totalMaps = job.getNumMapTasks();
|
||||||
abortFailureLimit = Math.max(30, totalMaps / 10);
|
abortFailureLimit = Math.max(30, totalMaps / 10);
|
||||||
|
copyTimeTracker = new CopyTimeTracker();
|
||||||
remainingMaps = totalMaps;
|
remainingMaps = totalMaps;
|
||||||
finishedMaps = new boolean[remainingMaps];
|
finishedMaps = new boolean[remainingMaps];
|
||||||
this.reporter = reporter;
|
this.reporter = reporter;
|
||||||
|
@ -180,7 +184,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
||||||
public synchronized void copySucceeded(TaskAttemptID mapId,
|
public synchronized void copySucceeded(TaskAttemptID mapId,
|
||||||
MapHost host,
|
MapHost host,
|
||||||
long bytes,
|
long bytes,
|
||||||
long millis,
|
long startMillis,
|
||||||
|
long endMillis,
|
||||||
MapOutput<K,V> output
|
MapOutput<K,V> output
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
failureCounts.remove(mapId);
|
failureCounts.remove(mapId);
|
||||||
|
@ -195,27 +200,46 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the status
|
// update single copy task status
|
||||||
|
long copyMillis = (endMillis - startMillis);
|
||||||
|
if (copyMillis == 0) copyMillis = 1;
|
||||||
|
float bytesPerMillis = (float) bytes / copyMillis;
|
||||||
|
float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
|
||||||
|
String individualProgress = "copy task(" + mapId + " succeeded"
|
||||||
|
+ " at " + mbpsFormat.format(transferRate) + " MB/s)";
|
||||||
|
// update the aggregated status
|
||||||
|
copyTimeTracker.add(startMillis, endMillis);
|
||||||
|
|
||||||
totalBytesShuffledTillNow += bytes;
|
totalBytesShuffledTillNow += bytes;
|
||||||
updateStatus();
|
updateStatus(individualProgress);
|
||||||
reduceShuffleBytes.increment(bytes);
|
reduceShuffleBytes.increment(bytes);
|
||||||
lastProgressTime = Time.monotonicNow();
|
lastProgressTime = Time.monotonicNow();
|
||||||
LOG.debug("map " + mapId + " done " + status.getStateString());
|
LOG.debug("map " + mapId + " done " + status.getStateString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateStatus() {
|
private synchronized void updateStatus(String individualProgress) {
|
||||||
float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
|
|
||||||
int mapsDone = totalMaps - remainingMaps;
|
int mapsDone = totalMaps - remainingMaps;
|
||||||
long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1;
|
long totalCopyMillis = copyTimeTracker.getCopyMillis();
|
||||||
|
if (totalCopyMillis == 0) totalCopyMillis = 1;
|
||||||
float transferRate = mbs / secsSinceStart;
|
float bytesPerMillis = (float) totalBytesShuffledTillNow / totalCopyMillis;
|
||||||
|
float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
|
||||||
progress.set((float) mapsDone / totalMaps);
|
progress.set((float) mapsDone / totalMaps);
|
||||||
String statusString = mapsDone + " / " + totalMaps + " copied.";
|
String statusString = mapsDone + " / " + totalMaps + " copied.";
|
||||||
status.setStateString(statusString);
|
status.setStateString(statusString);
|
||||||
|
|
||||||
progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
|
if (individualProgress != null) {
|
||||||
+ mbpsFormat.format(transferRate) + " MB/s)");
|
progress.setStatus(individualProgress + " Aggregated copy rate(" +
|
||||||
|
mapsDone + " of " + totalMaps + " at " +
|
||||||
|
mbpsFormat.format(transferRate) + " MB/s)");
|
||||||
|
} else {
|
||||||
|
progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
|
||||||
|
+ mbpsFormat.format(transferRate) + " MB/s)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateStatus() {
|
||||||
|
updateStatus(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void hostFailed(String hostname) {
|
public synchronized void hostFailed(String hostname) {
|
||||||
|
@ -520,4 +544,63 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
||||||
public int getMaxHostFailures() {
|
public int getMaxHostFailures() {
|
||||||
return maxHostFailures;
|
return maxHostFailures;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class CopyTimeTracker {
|
||||||
|
List<Interval> intervals;
|
||||||
|
long copyMillis;
|
||||||
|
public CopyTimeTracker() {
|
||||||
|
intervals = Collections.emptyList();
|
||||||
|
copyMillis = 0;
|
||||||
|
}
|
||||||
|
public void add(long s, long e) {
|
||||||
|
Interval interval = new Interval(s, e);
|
||||||
|
copyMillis = getTotalCopyMillis(interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCopyMillis() {
|
||||||
|
return copyMillis;
|
||||||
|
}
|
||||||
|
// This method captures the time during which any copy was in progress
|
||||||
|
// each copy time period is record in the Interval list
|
||||||
|
private long getTotalCopyMillis(Interval newInterval) {
|
||||||
|
if (newInterval == null) {
|
||||||
|
return copyMillis;
|
||||||
|
}
|
||||||
|
List<Interval> result = new ArrayList<Interval>(intervals.size() + 1);
|
||||||
|
for (Interval interval: intervals) {
|
||||||
|
if (interval.end < newInterval.start) {
|
||||||
|
result.add(interval);
|
||||||
|
} else if (interval.start > newInterval.end) {
|
||||||
|
result.add(newInterval);
|
||||||
|
newInterval = interval;
|
||||||
|
} else {
|
||||||
|
newInterval = new Interval(
|
||||||
|
Math.min(interval.start, newInterval.start),
|
||||||
|
Math.max(newInterval.end, interval.end));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.add(newInterval);
|
||||||
|
intervals = result;
|
||||||
|
|
||||||
|
//compute total millis
|
||||||
|
long length = 0;
|
||||||
|
for (Interval interval : intervals) {
|
||||||
|
length += interval.getIntervalLength();
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Interval {
|
||||||
|
final long start;
|
||||||
|
final long end;
|
||||||
|
public Interval(long s, long e) {
|
||||||
|
start = s;
|
||||||
|
end = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIntervalLength() {
|
||||||
|
return end - start;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,20 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapreduce.task.reduce;
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
|
||||||
|
import org.apache.hadoop.mapred.Task;
|
||||||
import org.apache.hadoop.mapred.TaskAttemptID;
|
import org.apache.hadoop.mapred.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapred.TaskStatus;
|
import org.apache.hadoop.mapred.TaskStatus;
|
||||||
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||||
|
import org.apache.hadoop.mapred.Counters.Counter;
|
||||||
|
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
@ -66,4 +77,150 @@ public class TestShuffleScheduler {
|
||||||
0.0f);
|
0.0f);
|
||||||
Assert.assertTrue(scheduler.waitUntilDone(1));
|
Assert.assertTrue(scheduler.waitUntilDone(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Test
|
||||||
|
public <K, V> void TestAggregatedTransferRate() throws Exception {
|
||||||
|
JobConf job = new JobConf();
|
||||||
|
job.setNumMapTasks(10);
|
||||||
|
//mock creation
|
||||||
|
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
|
||||||
|
Reporter mockReporter = mock(Reporter.class);
|
||||||
|
FileSystem mockFileSystem = mock(FileSystem.class);
|
||||||
|
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
|
||||||
|
@SuppressWarnings("unchecked") // needed for mock with generic
|
||||||
|
CombineOutputCollector<K, V> mockCombineOutputCollector =
|
||||||
|
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
|
||||||
|
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
|
||||||
|
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
|
||||||
|
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
|
||||||
|
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
|
||||||
|
Counter mockCounter = mock(Counter.class);
|
||||||
|
TaskStatus mockTaskStatus = mock(TaskStatus.class);
|
||||||
|
Progress mockProgress = mock(Progress.class);
|
||||||
|
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
|
||||||
|
Task mockTask = mock(Task.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
MapOutput<K, V> output = mock(MapOutput.class);
|
||||||
|
|
||||||
|
ShuffleConsumerPlugin.Context<K, V> context =
|
||||||
|
new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, job, mockFileSystem,
|
||||||
|
mockUmbilical, mockLocalDirAllocator,
|
||||||
|
mockReporter, mockCompressionCodec,
|
||||||
|
combinerClass, mockCombineOutputCollector,
|
||||||
|
mockCounter, mockCounter, mockCounter,
|
||||||
|
mockCounter, mockCounter, mockCounter,
|
||||||
|
mockTaskStatus, mockProgress, mockProgress,
|
||||||
|
mockTask, mockMapOutputFile, null);
|
||||||
|
TaskStatus status = new TaskStatus() {
|
||||||
|
@Override
|
||||||
|
public boolean getIsMap() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Progress progress = new Progress();
|
||||||
|
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null,
|
||||||
|
null, progress, context.getShuffledMapsCounter(),
|
||||||
|
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
|
||||||
|
TaskAttemptID attemptID0 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 0), 0);
|
||||||
|
|
||||||
|
//adding the 1st interval, 40MB from 60s to 100s
|
||||||
|
long bytes = (long)40 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000000_0 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(1 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID1 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 1), 1);
|
||||||
|
|
||||||
|
//adding the 2nd interval before the 1st interval, 50MB from 0s to 50s
|
||||||
|
bytes = (long)50 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000001_1 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(2 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID2 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 2), 2);
|
||||||
|
|
||||||
|
//adding the 3rd interval overlapping with the 1st and the 2nd interval
|
||||||
|
//110MB from 25s to 80s
|
||||||
|
bytes = (long)110 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000002_2 succeeded at 2.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(3 of 10 at 2.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID3 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 3), 3);
|
||||||
|
|
||||||
|
//adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s
|
||||||
|
bytes = (long)100 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000003_3 succeeded at 0.50 MB/s)"
|
||||||
|
+ " Aggregated copy rate(4 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID4 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 4), 4);
|
||||||
|
|
||||||
|
//adding the 5th interval between after 4th, 50MB from 350s to 400s
|
||||||
|
bytes = (long)50 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000004_4 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(5 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
|
||||||
|
TaskAttemptID attemptID5 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 5), 5);
|
||||||
|
//adding the 6th interval between after 5th, 50MB from 450s to 500s
|
||||||
|
bytes = (long)50 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000005_5 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(6 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID6 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 6), 6);
|
||||||
|
//adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s
|
||||||
|
bytes = (long)20 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000006_6 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(7 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID7 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 7), 7);
|
||||||
|
//adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s
|
||||||
|
bytes = (long)30 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000007_7 succeeded at 0.50 MB/s)"
|
||||||
|
+ " Aggregated copy rate(8 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID8 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 8), 8);
|
||||||
|
//adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s
|
||||||
|
bytes = (long)50 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000008_8 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(9 of 10 at 1.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
TaskAttemptID attemptID9 = new TaskAttemptID(
|
||||||
|
new org.apache.hadoop.mapred.TaskID(
|
||||||
|
new JobID("test",0), TaskType.MAP, 9), 9);
|
||||||
|
//adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s
|
||||||
|
bytes = (long)500 * 1024 * 1024;
|
||||||
|
scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output);
|
||||||
|
Assert.assertEquals("copy task(attempt_test_0000_m_000009_9 succeeded at 1.00 MB/s)"
|
||||||
|
+ " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue