diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d0fe1e69517..92288caf71b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -36,6 +36,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via jlowe) + MAPREDUCE-5251. Reducer should not implicate map attempt if it has + insufficient space to fetch map output (Ashwin Shankar via jlowe) + Release 2.1.1-beta - UNRELEASED INCOMPATIBLE CHANGES @@ -1112,6 +1115,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via jlowe) + MAPREDUCE-5251. Reducer should not implicate map attempt if it has + insufficient space to fetch map output (Ashwin Shankar via jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 4f1e5d14efa..b1c4dc7ef82 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -407,7 +407,14 @@ class Fetcher extends Thread { } // Get the location for the map output - either in-memory or on-disk - mapOutput = merger.reserve(mapId, decompressedLength, id); + try { + mapOutput = merger.reserve(mapId, decompressedLength, id); + } catch (IOException ioe) { + // kill this reduce attempt + ioErrs.increment(1); + scheduler.reportLocalError(ioe); + return EMPTY_ATTEMPT_ID_ARRAY; + } // Check if we can shuffle *now* ... if (mapOutput == null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index 76affb234be..6f9b222bdcf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -19,7 +19,9 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.HashMap; @@ -252,6 +254,16 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { failedShuffleCounter.increment(1); } + + public void reportLocalError(IOException ioe) { + try { + LOG.error("Shuffle failed : local error on this node: " + + InetAddress.getLocalHost()); + } catch (UnknownHostException e) { + LOG.error("Shuffle failed : local error on this node"); + } + reporter.reportException(ioe); + } // Notify the JobTracker // after every read error, if 'reportReadErrorImmediately' is true or diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 59839408613..07b6f62593a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -114,6 +115,36 @@ public class TestFetcher { LOG.info("<<<< " + name.getMethodName()); } + @Test + public void testReduceOutOfDiskSpace() throws Throwable { + LOG.info("testReduceOutOfDiskSpace"); + + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + when(connection.getInputStream()).thenReturn(in); + + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenThrow(new DiskErrorException("No disk space available")); + + underTest.copyFromHost(host); + verify(ss).reportLocalError(any(IOException.class)); + } + @Test(timeout=30000) public void testCopyFromHostConnectionTimeout() throws Exception { when(connection.getInputStream()).thenThrow(