From 30d0f10458fbe0e3a85ea2e22a1bb3d4454fd896 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 28 Apr 2015 20:17:52 +0000 Subject: [PATCH] MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne (cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6) --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 6 ++-- .../mapreduce/task/reduce/TestFetcher.java | 34 +++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 85c6fea67f2..10491f9b93d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -23,6 +23,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6252. JobHistoryServer should not fail when encountering a missing directory. (Craig Welch via devaraj) + MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon + IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index d867e4b406d..4b80dc99de6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -553,7 +553,10 @@ class Fetcher extends Thread { metrics.successFetch(); return null; } catch (IOException ioe) { - + if (mapOutput != null) { + mapOutput.abort(); + } + if (canRetry) { checkTimeoutOrRetry(host, ioe); } @@ -574,7 +577,6 @@ class Fetcher extends Thread { " from " + host.getHostName(), ioe); // Inform the shuffle-scheduler - mapOutput.abort(); metrics.failedFetch(); return new TaskAttemptID[] {mapId}; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 723df175356..a9cd33ea950 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -628,6 +628,40 @@ public class TestFetcher { verify(odmo).abort(); } + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testCopyFromHostWithRetryUnreserve() throws Exception { + InMemoryMapOutput immo = mock(InMemoryMapOutput.class); + Fetcher underTest = new FakeFetcher(jobWithRetry, + id, ss, mm, r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + + // Verify that unreserve occurs if an exception happens after shuffle + // buffer is reserved. + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + doThrow(new IOException("forced error")).when(immo).shuffle( + any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + verify(immo).abort(); + } + public static class FakeFetcher extends Fetcher { // If connection need to be reopen.