diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dc63500b51f..37e92cb2d9d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -429,6 +429,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-6115. TestPipeApplication#testSubmitter fails in trunk (Binglin Chang via jlowe) + MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting + for maps (Siqi Li via jlowe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index a41620058cf..796394f20e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -544,7 +544,7 @@ class Fetcher extends Thread { retryStartTime = 0; scheduler.copySucceeded(mapId, host, compressedLength, - endTime - startTime, mapOutput); + startTime, endTime, mapOutput); // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index 98256c2d65b..6794c99c820 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -162,7 +162,7 @@ class LocalFetcher extends Fetcher { } } - scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, + scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index e48a73a0c12..985a1e1f164 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.UnknownHostException; import java.text.DecimalFormat; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -64,7 +65,8 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { private static final long INITIAL_PENALTY = 10000; private static final float PENALTY_GROWTH_RATE = 1.3f; private final static int REPORT_FAILURE_LIMIT = 10; - + private static final float BYTES_PER_MILLIS_TO_MBS = 1000f / 1024 / 1024; + private final boolean[] finishedMaps; private final int totalMaps; @@ -92,6 +94,8 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { private final long startTime; private long lastProgressTime; + private final CopyTimeTracker copyTimeTracker; + private volatile int maxMapRuntime = 0; private final int maxFailedUniqueFetches; private final int maxFetchFailuresBeforeReporting; @@ -112,7 +116,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { Counters.Counter failedShuffleCounter) { totalMaps = job.getNumMapTasks(); abortFailureLimit = Math.max(30, totalMaps / 10); - + copyTimeTracker = new CopyTimeTracker(); remainingMaps = totalMaps; finishedMaps = new boolean[remainingMaps]; this.reporter = reporter; @@ -180,7 +184,8 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { public synchronized void copySucceeded(TaskAttemptID mapId, MapHost host, long bytes, - long millis, + long startMillis, + long endMillis, MapOutput output ) throws IOException { failureCounts.remove(mapId); @@ -195,29 +200,48 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { notifyAll(); } - // update the status + // update single copy task status + long copyMillis = (endMillis - startMillis); + if (copyMillis == 0) copyMillis = 1; + float bytesPerMillis = (float) bytes / copyMillis; + float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS; + String individualProgress = "copy task(" + mapId + " succeeded" + + " at " + mbpsFormat.format(transferRate) + " MB/s)"; + // update the aggregated status + copyTimeTracker.add(startMillis, endMillis); + totalBytesShuffledTillNow += bytes; - updateStatus(); + updateStatus(individualProgress); reduceShuffleBytes.increment(bytes); lastProgressTime = Time.monotonicNow(); LOG.debug("map " + mapId + " done " + status.getStateString()); } } - private void updateStatus() { - float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024); + private synchronized void updateStatus(String individualProgress) { int mapsDone = totalMaps - remainingMaps; - long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1; - - float transferRate = mbs / secsSinceStart; + long totalCopyMillis = copyTimeTracker.getCopyMillis(); + if (totalCopyMillis == 0) totalCopyMillis = 1; + float bytesPerMillis = (float) totalBytesShuffledTillNow / totalCopyMillis; + float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS; progress.set((float) mapsDone / totalMaps); String statusString = mapsDone + " / " + totalMaps + " copied."; status.setStateString(statusString); - progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at " - + mbpsFormat.format(transferRate) + " MB/s)"); + if (individualProgress != null) { + progress.setStatus(individualProgress + " Aggregated copy rate(" + + mapsDone + " of " + totalMaps + " at " + + mbpsFormat.format(transferRate) + " MB/s)"); + } else { + progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at " + + mbpsFormat.format(transferRate) + " MB/s)"); + } } + private void updateStatus() { + updateStatus(null); + } + public synchronized void hostFailed(String hostname) { if (hostFailures.containsKey(hostname)) { IntWritable x = hostFailures.get(hostname); @@ -520,4 +544,63 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { public int getMaxHostFailures() { return maxHostFailures; } + + private static class CopyTimeTracker { + List intervals; + long copyMillis; + public CopyTimeTracker() { + intervals = Collections.emptyList(); + copyMillis = 0; + } + public void add(long s, long e) { + Interval interval = new Interval(s, e); + copyMillis = getTotalCopyMillis(interval); + } + + public long getCopyMillis() { + return copyMillis; + } + // This method captures the time during which any copy was in progress + // each copy time period is record in the Interval list + private long getTotalCopyMillis(Interval newInterval) { + if (newInterval == null) { + return copyMillis; + } + List result = new ArrayList(intervals.size() + 1); + for (Interval interval: intervals) { + if (interval.end < newInterval.start) { + result.add(interval); + } else if (interval.start > newInterval.end) { + result.add(newInterval); + newInterval = interval; + } else { + newInterval = new Interval( + Math.min(interval.start, newInterval.start), + Math.max(newInterval.end, interval.end)); + } + } + result.add(newInterval); + intervals = result; + + //compute total millis + long length = 0; + for (Interval interval : intervals) { + length += interval.getIntervalLength(); + } + return length; + } + + private static class Interval { + final long start; + final long end; + public Interval(long s, long e) { + start = s; + end = e; + } + + public long getIntervalLength() { + return end - start; + } + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java index 355a419426f..905fd443fa9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java @@ -17,9 +17,20 @@ */ package org.apache.hadoop.mapreduce.task.reduce; +import static org.mockito.Mockito.mock; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.ShuffleConsumerPlugin; +import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; @@ -66,4 +77,150 @@ public class TestShuffleScheduler { 0.0f); Assert.assertTrue(scheduler.waitUntilDone(1)); } + + @SuppressWarnings("rawtypes") + @Test + public void TestAggregatedTransferRate() throws Exception { + JobConf job = new JobConf(); + job.setNumMapTasks(10); + //mock creation + TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); + Reporter mockReporter = mock(Reporter.class); + FileSystem mockFileSystem = mock(FileSystem.class); + Class combinerClass = job.getCombinerClass(); + @SuppressWarnings("unchecked") // needed for mock with generic + CombineOutputCollector mockCombineOutputCollector = + (CombineOutputCollector) mock(CombineOutputCollector.class); + org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = + mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); + Counter mockCounter = mock(Counter.class); + TaskStatus mockTaskStatus = mock(TaskStatus.class); + Progress mockProgress = mock(Progress.class); + MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); + Task mockTask = mock(Task.class); + @SuppressWarnings("unchecked") + MapOutput output = mock(MapOutput.class); + + ShuffleConsumerPlugin.Context context = + new ShuffleConsumerPlugin.Context(mockTaskAttemptID, job, mockFileSystem, + mockUmbilical, mockLocalDirAllocator, + mockReporter, mockCompressionCodec, + combinerClass, mockCombineOutputCollector, + mockCounter, mockCounter, mockCounter, + mockCounter, mockCounter, mockCounter, + mockTaskStatus, mockProgress, mockProgress, + mockTask, mockMapOutputFile, null); + TaskStatus status = new TaskStatus() { + @Override + public boolean getIsMap() { + return false; + } + @Override + public void addFetchFailedMap(TaskAttemptID mapTaskId) { + } + }; + Progress progress = new Progress(); + ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status, null, + null, progress, context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + TaskAttemptID attemptID0 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 0), 0); + + //adding the 1st interval, 40MB from 60s to 100s + long bytes = (long)40 * 1024 * 1024; + scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000000_0 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(1 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID1 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 1), 1); + + //adding the 2nd interval before the 1st interval, 50MB from 0s to 50s + bytes = (long)50 * 1024 * 1024; + scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000001_1 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(2 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID2 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 2), 2); + + //adding the 3rd interval overlapping with the 1st and the 2nd interval + //110MB from 25s to 80s + bytes = (long)110 * 1024 * 1024; + scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000002_2 succeeded at 2.00 MB/s)" + + " Aggregated copy rate(3 of 10 at 2.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID3 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 3), 3); + + //adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s + bytes = (long)100 * 1024 * 1024; + scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000003_3 succeeded at 0.50 MB/s)" + + " Aggregated copy rate(4 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID4 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 4), 4); + + //adding the 5th interval between after 4th, 50MB from 350s to 400s + bytes = (long)50 * 1024 * 1024; + scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000004_4 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(5 of 10 at 1.00 MB/s)", progress.toString()); + + + TaskAttemptID attemptID5 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 5), 5); + //adding the 6th interval between after 5th, 50MB from 450s to 500s + bytes = (long)50 * 1024 * 1024; + scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000005_5 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(6 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID6 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 6), 6); + //adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s + bytes = (long)20 * 1024 * 1024; + scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000006_6 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(7 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID7 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 7), 7); + //adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s + bytes = (long)30 * 1024 * 1024; + scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000007_7 succeeded at 0.50 MB/s)" + + " Aggregated copy rate(8 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID8 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 8), 8); + //adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s + bytes = (long)50 * 1024 * 1024; + scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000008_8 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(9 of 10 at 1.00 MB/s)", progress.toString()); + + TaskAttemptID attemptID9 = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 9), 9); + //adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s + bytes = (long)500 * 1024 * 1024; + scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output); + Assert.assertEquals("copy task(attempt_test_0000_m_000009_9 succeeded at 1.00 MB/s)" + + " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString()); + + } }