MAPREDUCE-6169. MergeQueue should release reference to the current item from key and value at the end of the iteration to save memory. (Zhihai Xu via kasha)
This commit is contained in:
parent
eb4045e765
commit
90194ca1cb
|
@ -237,6 +237,10 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
||||||
|
from key and value at the end of the iteration to save memory.
|
||||||
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
MAPREDUCE-5918. LineRecordReader can return the same decompressor to
|
MAPREDUCE-5918. LineRecordReader can return the same decompressor to
|
||||||
|
|
|
@ -528,9 +528,17 @@ public class Merger {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void resetKeyValue() {
|
||||||
|
key = null;
|
||||||
|
value.reset(new byte[] {}, 0);
|
||||||
|
diskIFileValue.reset(new byte[] {}, 0);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean next() throws IOException {
|
public boolean next() throws IOException {
|
||||||
if (size() == 0)
|
if (size() == 0) {
|
||||||
|
resetKeyValue();
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (minSegment != null) {
|
if (minSegment != null) {
|
||||||
//minSegment is non-null for all invocations of next except the first
|
//minSegment is non-null for all invocations of next except the first
|
||||||
|
@ -539,6 +547,7 @@ public class Merger {
|
||||||
adjustPriorityQueue(minSegment);
|
adjustPriorityQueue(minSegment);
|
||||||
if (size() == 0) {
|
if (size() == 0) {
|
||||||
minSegment = null;
|
minSegment = null;
|
||||||
|
resetKeyValue();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -294,6 +294,8 @@ public class TestMerger {
|
||||||
// Now there should be no more input
|
// Now there should be no more input
|
||||||
Assert.assertFalse(mergeQueue.next());
|
Assert.assertFalse(mergeQueue.next());
|
||||||
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
|
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
|
||||||
|
Assert.assertTrue(mergeQueue.getKey() == null);
|
||||||
|
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Progressable getReporter() {
|
private Progressable getReporter() {
|
||||||
|
|
Loading…
Reference in New Issue