MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne

(cherry picked from commit bc1bd7e5c4)
This commit is contained in:
Jason Lowe 2015-04-28 20:17:52 +00:00
parent 6b20b325f4
commit 30d0f10458
3 changed files with 41 additions and 2 deletions

View File

@ -23,6 +23,9 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6252. JobHistoryServer should not fail when encountering a MAPREDUCE-6252. JobHistoryServer should not fail when encountering a
missing directory. (Craig Welch via devaraj) missing directory. (Craig Welch via devaraj)
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread {
metrics.successFetch(); metrics.successFetch();
return null; return null;
} catch (IOException ioe) { } catch (IOException ioe) {
if (mapOutput != null) {
mapOutput.abort();
}
if (canRetry) { if (canRetry) {
checkTimeoutOrRetry(host, ioe); checkTimeoutOrRetry(host, ioe);
} }
@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread {
" from " + host.getHostName(), ioe); " from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler // Inform the shuffle-scheduler
mapOutput.abort();
metrics.failedFetch(); metrics.failedFetch();
return new TaskAttemptID[] {mapId}; return new TaskAttemptID[] {mapId};
} }

View File

@ -628,6 +628,40 @@ public class TestFetcher {
verify(odmo).abort(); verify(odmo).abort();
} }
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testCopyFromHostWithRetryUnreserve() throws Exception {
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
id, ss, mm, r, metrics, except, key, connection);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
// Verify that unreserve occurs if an exception happens after shuffle
// buffer is reserved.
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo);
doThrow(new IOException("forced error")).when(immo).shuffle(
any(MapHost.class), any(InputStream.class), anyLong(),
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
underTest.copyFromHost(host);
verify(immo).abort();
}
public static class FakeFetcher<K,V> extends Fetcher<K,V> { public static class FakeFetcher<K,V> extends Fetcher<K,V> {
// If connection need to be reopen. // If connection need to be reopen.