diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2cc69e6c78d..620ccb84a33 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -757,6 +757,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4467. IndexCache failures due to missing synchronization (Kihwal Lee via tgraves) + MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans + via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 64bc43e51b6..6c527ae1ce5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -21,11 +21,12 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.HttpURLConnection; import java.net.URLConnection; import java.security.GeneralSecurityException; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -53,7 +54,8 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings({"deprecation"}) +import com.google.common.annotations.VisibleForTesting; + class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -199,6 +201,7 @@ class Fetcher extends Thread { } } + @VisibleForTesting protected HttpURLConnection openConnection(URL url) throws IOException { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); if (sslShuffle) { @@ -209,17 +212,18 @@ class Fetcher extends Thread { throw new IOException(ex); } httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); - } - return conn; } - + return conn; + } + /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - private void copyFromHost(MapHost host) throws IOException { + @VisibleForTesting + protected void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List maps = scheduler.getMapsForHost(host); @@ -229,9 +233,9 @@ class Fetcher extends Thread { return; } - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); + if(LOG.isDebugEnabled()) { + LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + + maps); } // List of maps to be fetched yet @@ -304,17 +308,25 @@ class Fetcher extends Thread { try { // Loop through available map-outputs and fetch them - // On any error, good becomes false and we exit after putting back - // the remaining maps to the yet_to_be_fetched list - boolean good = true; - while (!remaining.isEmpty() && good) { - good = copyMapOutput(host, input, remaining); + // On any error, faildTasks is not null and we exit + // after putting back the remaining maps to the + // yet_to_be_fetched list and marking the failed tasks. + TaskAttemptID[] failedTasks = null; + while (!remaining.isEmpty() && failedTasks == null) { + failedTasks = copyMapOutput(host, input, remaining); + } + + if(failedTasks != null && failedTasks.length > 0) { + LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks)); + for(TaskAttemptID left: failedTasks) { + scheduler.copyFailed(left, host, true); + } } IOUtils.cleanup(LOG, input); // Sanity check - if (good && !remaining.isEmpty()) { + if (failedTasks == null && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -323,10 +335,11 @@ class Fetcher extends Thread { scheduler.putBackKnownMapOutput(host, left); } } - - } + } - private boolean copyMapOutput(MapHost host, + private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0]; + + private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, Set remaining) { MapOutput mapOutput = null; @@ -348,18 +361,21 @@ class Fetcher extends Thread { } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - return false; + //Don't know which one was bad, so consider all of them as bad + return remaining.toArray(new TaskAttemptID[remaining.size()]); } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return false; + return new TaskAttemptID[] {mapId}; } - LOG.debug("header: " + mapId + ", len: " + compressedLength + - ", decomp len: " + decompressedLength); + if(LOG.isDebugEnabled()) { + LOG.debug("header: " + mapId + ", len: " + compressedLength + + ", decomp len: " + decompressedLength); + } // Get the location for the map output - either in-memory or on-disk mapOutput = merger.reserve(mapId, decompressedLength, id); @@ -367,7 +383,8 @@ class Fetcher extends Thread { // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return false; + //Not an error but wait to process data. + return EMPTY_ATTEMPT_ID_ARRAY; } // Go! @@ -389,24 +406,27 @@ class Fetcher extends Thread { // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return true; + return null; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - return false; + if(mapId == null) { + return remaining.toArray(new TaskAttemptID[remaining.size()]); + } else { + return new TaskAttemptID[] {mapId}; + } } - LOG.info("Failed to shuffle output of " + mapId + + LOG.warn("Failed to shuffle output of " + mapId + " from " + host.getHostName(), ioe); // Inform the shuffle-scheduler mapOutput.abort(); - scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return false; + return new TaskAttemptID[] {mapId}; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java new file mode 100644 index 00000000000..097f120dd21 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; + +import javax.crypto.SecretKey; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.junit.Test; + +/** + * Test that the Fetcher does what we expect it to. + */ +public class TestFetcher { + private static final Log LOG = LogFactory.getLog(TestFetcher.class); + + public static class FakeFetcher extends Fetcher { + + private HttpURLConnection connection; + + public FakeFetcher(JobConf job, TaskAttemptID reduceId, + ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, + ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, + SecretKey jobTokenSecret, HttpURLConnection connection) { + super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, + jobTokenSecret); + this.connection = connection; + } + + @Override + protected HttpURLConnection openConnection(URL url) throws IOException { + if(connection != null) { + return connection; + } + return super.openConnection(url); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testCopyFromHostBogusHeader() throws Exception { + LOG.info("testCopyFromHostBogusHeader"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler ss = mock(ShuffleScheduler.class); + MergeManager mm = mock(MergeManager.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList maps = new ArrayList(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ByteArrayInputStream in = new ByteArrayInputStream( + "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes()); + when(connection.getInputStream()).thenReturn(in); + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, true); + verify(ss).copyFailed(map2ID, host, true); + + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } + + @SuppressWarnings("unchecked") + @Test + public void testCopyFromHostWait() throws Exception { + LOG.info("testCopyFromHostWait"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler ss = mock(ShuffleScheduler.class); + MergeManager mm = mock(MergeManager.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList maps = new ArrayList(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + //Defaults to WAIT, which is what we want to test + MapOutput mapOut = new MapOutput(map1ID); + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(mapOut); + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + verify(allErrs, never()).increment(1); + verify(ss, never()).copyFailed(map1ID, host, true); + verify(ss, never()).copyFailed(map2ID, host, true); + + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } + +}