MAPREDUCE-6199. AbstractCounters are not reset completely on deserialization (adhoot via rkanter)
(cherry picked from commit 390a7c12f5
)
This commit is contained in:
parent
8ee40a1580
commit
f9341c1e2c
|
@ -58,6 +58,9 @@ Release 2.7.0 - UNRELEASED
|
|||
MAPREDUCE-6045. need close the DataInputStream after open it in
|
||||
TestMapReduce.java (zxu via rkanter)
|
||||
|
||||
MAPREDUCE-6199. AbstractCounters are not reset completely on
|
||||
deserialization (adhoot via rkanter)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -307,6 +307,10 @@ public abstract class AbstractCounters<C extends Counter,
|
|||
fgroups.put(group.getName(), group);
|
||||
}
|
||||
int numGroups = WritableUtils.readVInt(in);
|
||||
if (!groups.isEmpty()) {
|
||||
groups.clear();
|
||||
limits.reset();
|
||||
}
|
||||
while (numGroups-- > 0) {
|
||||
limits.checkGroups(groups.size() + 1);
|
||||
G group = groupFactory.newGenericGroup(
|
||||
|
|
|
@ -124,8 +124,15 @@ public class Limits {
|
|||
return firstViolation;
|
||||
}
|
||||
|
||||
// This allows initialization of global settings and not for an instance
|
||||
public static synchronized void reset(Configuration conf) {
|
||||
isInited = false;
|
||||
init(conf);
|
||||
}
|
||||
|
||||
// This allows resetting of an instance to allow reuse
|
||||
public synchronized void reset() {
|
||||
totalCounters = 0;
|
||||
firstViolation = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
@ -70,7 +74,40 @@ public class TestCounters {
|
|||
testMaxGroups(new Counters());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test public void testResetOnDeserialize() throws IOException {
|
||||
// Allow only one counterGroup
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.COUNTER_GROUPS_MAX_KEY, 1);
|
||||
Limits.init(conf);
|
||||
|
||||
Counters countersWithOneGroup = new Counters();
|
||||
countersWithOneGroup.findCounter("firstOf1Allowed", "First group");
|
||||
boolean caughtExpectedException = false;
|
||||
try {
|
||||
countersWithOneGroup.findCounter("secondIsTooMany", "Second group");
|
||||
}
|
||||
catch (LimitExceededException _) {
|
||||
caughtExpectedException = true;
|
||||
}
|
||||
|
||||
assertTrue("Did not throw expected exception",
|
||||
caughtExpectedException);
|
||||
|
||||
Counters countersWithZeroGroups = new Counters();
|
||||
DataOutputBuffer out = new DataOutputBuffer();
|
||||
countersWithZeroGroups.write(out);
|
||||
|
||||
DataInputBuffer in = new DataInputBuffer();
|
||||
in.reset(out.getData(), out.getLength());
|
||||
|
||||
countersWithOneGroup.readFields(in);
|
||||
|
||||
// After reset one should be able to add a group
|
||||
countersWithOneGroup.findCounter("firstGroupAfterReset", "After reset " +
|
||||
"limit should be set back to zero");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountersIncrement() {
|
||||
Counters fCounters = new Counters();
|
||||
|
|
Loading…
Reference in New Issue