MAPREDUCE-5831. Make MR client ignore unknown counters received from AM. Contributed by Junping Du.
This commit is contained in:
parent
4ea77efa3a
commit
662fc11ae7
|
@ -377,6 +377,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-6093. minor distcp doc edits (Charles Lamb via aw)
|
MAPREDUCE-6093. minor distcp doc edits (Charles Lamb via aw)
|
||||||
|
|
||||||
|
MAPREDUCE-5831. Make MR client ignore unknown counters received from AM.
|
||||||
|
(Junping Du via zjshen)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -241,7 +241,10 @@ public class TypeConverter {
|
||||||
org.apache.hadoop.mapreduce.Counter c =
|
org.apache.hadoop.mapreduce.Counter c =
|
||||||
counters.findCounter(yGrp.getName(),
|
counters.findCounter(yGrp.getName(),
|
||||||
yCntr.getName());
|
yCntr.getName());
|
||||||
c.setValue(yCntr.getValue());
|
// if c can be found, or it will be skipped.
|
||||||
|
if (c != null) {
|
||||||
|
c.setValue(yCntr.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return counters;
|
return counters;
|
||||||
|
|
|
@ -34,6 +34,8 @@ import com.google.common.collect.AbstractIterator;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
@ -54,6 +56,8 @@ public abstract class FileSystemCounterGroup<C extends Counter>
|
||||||
static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
|
static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
|
||||||
static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
|
static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(FileSystemCounterGroup.class);
|
||||||
|
|
||||||
// C[] would need Array.newInstance which requires a Class<C> reference.
|
// C[] would need Array.newInstance which requires a Class<C> reference.
|
||||||
// Just a few local casts probably worth not having to carry it around.
|
// Just a few local casts probably worth not having to carry it around.
|
||||||
private final Map<String, Object[]> map =
|
private final Map<String, Object[]> map =
|
||||||
|
@ -159,13 +163,17 @@ public abstract class FileSystemCounterGroup<C extends Counter>
|
||||||
else {
|
else {
|
||||||
ours = findCounter(counter.getName());
|
ours = findCounter(counter.getName());
|
||||||
}
|
}
|
||||||
ours.setValue(counter.getValue());
|
if (ours != null) {
|
||||||
|
ours.setValue(counter.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public C addCounter(String name, String displayName, long value) {
|
public C addCounter(String name, String displayName, long value) {
|
||||||
C counter = findCounter(name);
|
C counter = findCounter(name);
|
||||||
counter.setValue(value);
|
if (counter != null) {
|
||||||
|
counter.setValue(value);
|
||||||
|
}
|
||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,13 +200,14 @@ public abstract class FileSystemCounterGroup<C extends Counter>
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
if (create) throw new IllegalArgumentException(e);
|
if (create) throw new IllegalArgumentException(e);
|
||||||
|
LOG.warn(counterName + " is not a recognized counter.");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public C findCounter(String counterName) {
|
public C findCounter(String counterName) {
|
||||||
return findCounter(counterName, true);
|
return findCounter(counterName, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -151,13 +151,21 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
||||||
@Override
|
@Override
|
||||||
public void addCounter(C counter) {
|
public void addCounter(C counter) {
|
||||||
C ours = findCounter(counter.getName());
|
C ours = findCounter(counter.getName());
|
||||||
ours.setValue(counter.getValue());
|
if (ours != null) {
|
||||||
|
ours.setValue(counter.getValue());
|
||||||
|
} else {
|
||||||
|
LOG.warn(counter.getName() + "is not a known counter.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public C addCounter(String name, String displayName, long value) {
|
public C addCounter(String name, String displayName, long value) {
|
||||||
C counter = findCounter(name);
|
C counter = findCounter(name);
|
||||||
counter.setValue(value);
|
if (counter != null) {
|
||||||
|
counter.setValue(value);
|
||||||
|
} else {
|
||||||
|
LOG.warn(name + "is not a known counter.");
|
||||||
|
}
|
||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +187,13 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public C findCounter(String counterName) {
|
public C findCounter(String counterName) {
|
||||||
return findCounter(valueOf(counterName));
|
try {
|
||||||
|
T enumValue = valueOf(counterName);
|
||||||
|
return findCounter(enumValue);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
LOG.warn(counterName + " is not a recognized counter.");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -208,13 +222,15 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("rawtypes")
|
||||||
public void incrAllCounters(CounterGroupBase<C> other) {
|
public void incrAllCounters(CounterGroupBase<C> other) {
|
||||||
if (checkNotNull(other, "other counter group")
|
if (checkNotNull(other, "other counter group")
|
||||||
instanceof FrameworkCounterGroup<?, ?>) {
|
instanceof FrameworkCounterGroup<?, ?>) {
|
||||||
for (Counter counter : other) {
|
for (Counter counter : other) {
|
||||||
findCounter(((FrameworkCounter) counter).key.name())
|
C c = findCounter(((FrameworkCounter) counter).key.name());
|
||||||
.increment(counter.getValue());
|
if (c != null) {
|
||||||
|
c.increment(counter.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,9 +33,12 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.mapred.Counters.Counter;
|
import org.apache.hadoop.mapred.Counters.Counter;
|
||||||
import org.apache.hadoop.mapred.Counters.CountersExceededException;
|
import org.apache.hadoop.mapred.Counters.CountersExceededException;
|
||||||
import org.apache.hadoop.mapred.Counters.Group;
|
import org.apache.hadoop.mapred.Counters.Group;
|
||||||
|
import org.apache.hadoop.mapred.Counters.GroupFactory;
|
||||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
|
||||||
|
import org.apache.hadoop.mapreduce.counters.CounterGroupFactory.FrameworkGroupFactory;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -321,4 +324,55 @@ public class TestCounters {
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
new TestCounters().testCounters();
|
new TestCounters().testCounters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Test
|
||||||
|
public void testFrameworkCounter() {
|
||||||
|
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||||
|
FrameworkGroupFactory frameworkGroupFactory =
|
||||||
|
groupFactory.newFrameworkGroupFactory(JobCounter.class);
|
||||||
|
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
|
||||||
|
|
||||||
|
FrameworkCounterGroup counterGroup =
|
||||||
|
(FrameworkCounterGroup) group.getUnderlyingGroup();
|
||||||
|
|
||||||
|
org.apache.hadoop.mapreduce.Counter count1 =
|
||||||
|
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
|
||||||
|
Assert.assertNotNull(count1);
|
||||||
|
|
||||||
|
// Verify no exception get thrown when finding an unknown counter
|
||||||
|
org.apache.hadoop.mapreduce.Counter count2 =
|
||||||
|
counterGroup.findCounter("Unknown");
|
||||||
|
Assert.assertNull(count2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilesystemCounter() {
|
||||||
|
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||||
|
Group fsGroup = groupFactory.newFileSystemGroup();
|
||||||
|
|
||||||
|
org.apache.hadoop.mapreduce.Counter count1 =
|
||||||
|
fsGroup.findCounter("ANY_BYTES_READ");
|
||||||
|
Assert.assertNotNull(count1);
|
||||||
|
|
||||||
|
// Verify no exception get thrown when finding an unknown counter
|
||||||
|
org.apache.hadoop.mapreduce.Counter count2 =
|
||||||
|
fsGroup.findCounter("Unknown");
|
||||||
|
Assert.assertNull(count2);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class GroupFactoryForTest extends GroupFactory {
|
||||||
|
public <T extends Enum<T>>
|
||||||
|
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
|
||||||
|
return super.newFrameworkGroupFactory(cls);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Group newFileSystemGroup() {
|
||||||
|
return super.newFileSystemGroup();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue