MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on shuffle completion. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1523660 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-09-16 14:18:42 +00:00
parent 60a1a70197
commit 59587d9fad
3 changed files with 12 additions and 1 deletions

View File

@ -267,6 +267,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS
(Nemon Lou via devaraj) (Nemon Lou via devaraj)
MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on
shuffle completion. (jlowe via acmurthy)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -355,8 +355,11 @@ public RawKeyValueIterator close() throws Throwable {
List<InMemoryMapOutput<K, V>> memory = List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs); memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk); return finalMerge(jobConf, rfs, memory, disk);
} }

View File

@ -82,7 +82,7 @@ public void cleanup() throws IOException {
} }
@Test @Test
public void testInMemoryMerger() throws IOException { public void testInMemoryMerger() throws Throwable {
JobID jobId = new JobID("a", 0); JobID jobId = new JobID("a", 0);
TaskAttemptID reduceId = new TaskAttemptID( TaskAttemptID reduceId = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0); new TaskID(jobId, TaskType.REDUCE, 0), 0);
@ -132,6 +132,11 @@ public void testInMemoryMerger() throws IOException {
readOnDiskMapOutput(conf, fs, outPath, keys, values); readOnDiskMapOutput(conf, fs, outPath, keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
} }
private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)