MAPREDUCE-5831. Make MR client ignore unknown counters received from AM. Contributed by Junping Du.
(cherry picked from commit 662fc11ae7
)
This commit is contained in:
parent
0eaa4ba026
commit
7b3ecc547f
|
@ -158,6 +158,9 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
MAPREDUCE-6093. minor distcp doc edits (Charles Lamb via aw)
|
||||
|
||||
MAPREDUCE-5831. Make MR client ignore unknown counters received from AM.
|
||||
(Junping Du via zjshen)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -241,9 +241,12 @@ public class TypeConverter {
|
|||
org.apache.hadoop.mapreduce.Counter c =
|
||||
counters.findCounter(yGrp.getName(),
|
||||
yCntr.getName());
|
||||
// if c can be found, or it will be skipped.
|
||||
if (c != null) {
|
||||
c.setValue(yCntr.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
return counters;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ import com.google.common.collect.AbstractIterator;
|
|||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
@ -54,6 +56,8 @@ public abstract class FileSystemCounterGroup<C extends Counter>
|
|||
static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
|
||||
static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FileSystemCounterGroup.class);
|
||||
|
||||
// C[] would need Array.newInstance which requires a Class<C> reference.
|
||||
// Just a few local casts probably worth not having to carry it around.
|
||||
private final Map<String, Object[]> map =
|
||||
|
@ -159,13 +163,17 @@ public abstract class FileSystemCounterGroup<C extends Counter>
|
|||
else {
|
||||
ours = findCounter(counter.getName());
|
||||
}
|
||||
if (ours != null) {
|
||||
ours.setValue(counter.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public C addCounter(String name, String displayName, long value) {
|
||||
C counter = findCounter(name);
|
||||
if (counter != null) {
|
||||
counter.setValue(value);
|
||||
}
|
||||
return counter;
|
||||
}
|
||||
|
||||
|
@ -192,13 +200,14 @@ public abstract class FileSystemCounterGroup<C extends Counter>
|
|||
}
|
||||
catch (Exception e) {
|
||||
if (create) throw new IllegalArgumentException(e);
|
||||
LOG.warn(counterName + " is not a recognized counter.");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public C findCounter(String counterName) {
|
||||
return findCounter(counterName, true);
|
||||
return findCounter(counterName, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -151,13 +151,21 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
|||
@Override
|
||||
public void addCounter(C counter) {
|
||||
C ours = findCounter(counter.getName());
|
||||
if (ours != null) {
|
||||
ours.setValue(counter.getValue());
|
||||
} else {
|
||||
LOG.warn(counter.getName() + "is not a known counter.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public C addCounter(String name, String displayName, long value) {
|
||||
C counter = findCounter(name);
|
||||
if (counter != null) {
|
||||
counter.setValue(value);
|
||||
} else {
|
||||
LOG.warn(name + "is not a known counter.");
|
||||
}
|
||||
return counter;
|
||||
}
|
||||
|
||||
|
@ -179,7 +187,13 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
|||
|
||||
@Override
|
||||
public C findCounter(String counterName) {
|
||||
return findCounter(valueOf(counterName));
|
||||
try {
|
||||
T enumValue = valueOf(counterName);
|
||||
return findCounter(enumValue);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.warn(counterName + " is not a recognized counter.");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -208,13 +222,15 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
@SuppressWarnings("rawtypes")
|
||||
public void incrAllCounters(CounterGroupBase<C> other) {
|
||||
if (checkNotNull(other, "other counter group")
|
||||
instanceof FrameworkCounterGroup<?, ?>) {
|
||||
for (Counter counter : other) {
|
||||
findCounter(((FrameworkCounter) counter).key.name())
|
||||
.increment(counter.getValue());
|
||||
C c = findCounter(((FrameworkCounter) counter).key.name());
|
||||
if (c != null) {
|
||||
c.increment(counter.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,9 +33,12 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.mapred.Counters.Counter;
|
||||
import org.apache.hadoop.mapred.Counters.CountersExceededException;
|
||||
import org.apache.hadoop.mapred.Counters.Group;
|
||||
import org.apache.hadoop.mapred.Counters.GroupFactory;
|
||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||
import org.apache.hadoop.mapreduce.JobCounter;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
|
||||
import org.apache.hadoop.mapreduce.counters.CounterGroupFactory.FrameworkGroupFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -321,4 +324,55 @@ public class TestCounters {
|
|||
public static void main(String[] args) throws IOException {
|
||||
new TestCounters().testCounters();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testFrameworkCounter() {
|
||||
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||
FrameworkGroupFactory frameworkGroupFactory =
|
||||
groupFactory.newFrameworkGroupFactory(JobCounter.class);
|
||||
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
|
||||
|
||||
FrameworkCounterGroup counterGroup =
|
||||
(FrameworkCounterGroup) group.getUnderlyingGroup();
|
||||
|
||||
org.apache.hadoop.mapreduce.Counter count1 =
|
||||
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
|
||||
Assert.assertNotNull(count1);
|
||||
|
||||
// Verify no exception get thrown when finding an unknown counter
|
||||
org.apache.hadoop.mapreduce.Counter count2 =
|
||||
counterGroup.findCounter("Unknown");
|
||||
Assert.assertNull(count2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilesystemCounter() {
|
||||
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||
Group fsGroup = groupFactory.newFileSystemGroup();
|
||||
|
||||
org.apache.hadoop.mapreduce.Counter count1 =
|
||||
fsGroup.findCounter("ANY_BYTES_READ");
|
||||
Assert.assertNotNull(count1);
|
||||
|
||||
// Verify no exception get thrown when finding an unknown counter
|
||||
org.apache.hadoop.mapreduce.Counter count2 =
|
||||
fsGroup.findCounter("Unknown");
|
||||
Assert.assertNull(count2);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class GroupFactoryForTest extends GroupFactory {
|
||||
public <T extends Enum<T>>
|
||||
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
|
||||
return super.newFrameworkGroupFactory(cls);
|
||||
}
|
||||
|
||||
public Group newFileSystemGroup() {
|
||||
return super.newFileSystemGroup();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue