Merge -c 1523660 from trunk to branch-2 to fix MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on shuffle completion. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1523661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-09-16 14:19:33 +00:00
parent c061a0da55
commit 73c69decf1
3 changed files with 12 additions and 1 deletions

View File

@ -130,6 +130,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS
(Nemon Lou via devaraj) (Nemon Lou via devaraj)
MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on
shuffle completion. (jlowe via acmurthy)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -355,8 +355,11 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
List<InMemoryMapOutput<K, V>> memory = List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs); memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk); return finalMerge(jobConf, rfs, memory, disk);
} }

View File

@ -82,7 +82,7 @@ public class TestMerger {
} }
@Test @Test
public void testInMemoryMerger() throws IOException { public void testInMemoryMerger() throws Throwable {
JobID jobId = new JobID("a", 0); JobID jobId = new JobID("a", 0);
TaskAttemptID reduceId = new TaskAttemptID( TaskAttemptID reduceId = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0); new TaskID(jobId, TaskType.REDUCE, 0), 0);
@ -132,6 +132,11 @@ public class TestMerger {
readOnDiskMapOutput(conf, fs, outPath, keys, values); readOnDiskMapOutput(conf, fs, outPath, keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
} }
private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)