diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8639a6f7625..5df9c136f7b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -739,6 +739,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't init. (Jason Lowe via daryn) + MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans + via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index f3e7fd61c27..7852da93a52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -49,7 +49,8 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings({"deprecation"}) +import com.google.common.annotations.VisibleForTesting; + class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -175,13 +176,18 @@ class Fetcher extends Thread { } } + @VisibleForTesting + protected HttpURLConnection openConnection(URL url) throws IOException { + return (HttpURLConnection)url.openConnection(); + } + /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - private void copyFromHost(MapHost host) throws IOException { + protected void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List maps = scheduler.getMapsForHost(host); @@ -191,9 +197,11 @@ class Fetcher extends Thread { return; } - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); + if(LOG.isDebugEnabled()) { + LOG.debug("Fetcher " + id + " going to fetch from " + host); + for (TaskAttemptID tmp: maps) { + LOG.debug(tmp); + } } // List of maps to be fetched yet @@ -205,7 +213,7 @@ class Fetcher extends Thread { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + HttpURLConnection connection = openConnection(url); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); @@ -266,17 +274,24 @@ class Fetcher extends Thread { try { // Loop through available map-outputs and fetch them - // On any error, good becomes false and we exit after putting back - // the remaining maps to the yet_to_be_fetched list - boolean good = true; - while (!remaining.isEmpty() && good) { - good = copyMapOutput(host, input, remaining); + // On any error, faildTasks is not null and we exit + // after putting back the remaining maps to the + // yet_to_be_fetched list and marking the failed tasks. + TaskAttemptID[] failedTasks = null; + while (!remaining.isEmpty() && failedTasks == null) { + failedTasks = copyMapOutput(host, input, remaining); + } + + if(failedTasks != null) { + for(TaskAttemptID left: failedTasks) { + scheduler.copyFailed(left, host, true); + } } IOUtils.cleanup(LOG, input); // Sanity check - if (good && !remaining.isEmpty()) { + if (failedTasks == null && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -285,10 +300,9 @@ class Fetcher extends Thread { scheduler.putBackKnownMapOutput(host, left); } } - - } + } - private boolean copyMapOutput(MapHost host, + private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, Set remaining) { MapOutput mapOutput = null; @@ -310,14 +324,15 @@ class Fetcher extends Thread { } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - return false; + //Don't know which one was bad, so consider all of them as bad + return remaining.toArray(new TaskAttemptID[remaining.size()]); } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return false; + return new TaskAttemptID[] {mapId}; } LOG.debug("header: " + mapId + ", len: " + compressedLength + @@ -329,7 +344,7 @@ class Fetcher extends Thread { // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return false; + return new TaskAttemptID[] {mapId}; } // Go! @@ -351,14 +366,18 @@ class Fetcher extends Thread { // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return true; + return null; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - return false; + if(mapId == null) { + return remaining.toArray(new TaskAttemptID[remaining.size()]); + } else { + return new TaskAttemptID[] {mapId}; + } } LOG.info("Failed to shuffle output of " + mapId + @@ -366,9 +385,8 @@ class Fetcher extends Thread { // Inform the shuffle-scheduler mapOutput.abort(); - scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return false; + return new TaskAttemptID[] {mapId}; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java new file mode 100644 index 00000000000..7f7c3f50b87 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; + +import javax.crypto.SecretKey; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.junit.Test; + +/** + * Test that the Fetcher does what we expect it to. + */ +public class TestFetcher { + + public static class FakeFetcher extends Fetcher { + + private HttpURLConnection connection; + + public FakeFetcher(JobConf job, TaskAttemptID reduceId, + ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, + ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, + SecretKey jobTokenSecret, HttpURLConnection connection) { + super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, + jobTokenSecret); + this.connection = connection; + } + + @Override + protected HttpURLConnection openConnection(URL url) throws IOException { + if(connection != null) { + return connection; + } + return super.openConnection(url); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testCopyFromHostBogusHeader() throws Exception { + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler ss = mock(ShuffleScheduler.class); + MergeManager mm = mock(MergeManager.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList maps = new ArrayList(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ByteArrayInputStream in = new ByteArrayInputStream( + "5 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes()); + when(connection.getInputStream()).thenReturn(in); + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, true); + verify(ss).copyFailed(map2ID, host, true); + } + +}