MAPREDUCE-3697. Support binary compatibility for Counters after MAPREDUCE-901.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241319 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-02-07 01:41:28 +00:00
parent 2d1406e9e7
commit 35f12b9556
11 changed files with 318 additions and 155 deletions

View File

@ -736,6 +736,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3794. Support mapred.Task.Counter and mapred.JobInProgress.Counter
enums for compatibility (Tom White via mahadev)
MAPREDUCE-3697. Support binary compatibility for Counters after
MAPREDUCE-901. (mahadev via acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -21,8 +21,14 @@
import static org.apache.hadoop.mapreduce.util.CountersStrings.parseEscapedCompactString;
import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -36,6 +42,9 @@
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.util.CountersStrings;
import com.google.common.collect.Iterators;
/**
* A set of named counters.
@ -52,6 +61,8 @@
public class Counters
extends AbstractCounters<Counters.Counter, Counters.Group> {
public static int MAX_COUNTER_LIMIT = Limits.COUNTERS_MAX;
public Counters() {
super(groupFactory);
}
@ -69,17 +80,82 @@ static Counters downgrade(org.apache.hadoop.mapreduce.Counters newCounters) {
return new Counters(newCounters);
}
public synchronized Group getGroup(String groupName) {
return super.getGroup(groupName);
}
@SuppressWarnings("unchecked")
public synchronized Collection<String> getGroupNames() {
return IteratorUtils.toList(super.getGroupNames().iterator());
}
public synchronized String makeCompactString() {
return CountersStrings.toEscapedCompactString(this);
}
/**
* A counter record, comprising its name and value.
*/
public interface Counter extends org.apache.hadoop.mapreduce.Counter {
public static class Counter implements org.apache.hadoop.mapreduce.Counter {
org.apache.hadoop.mapreduce.Counter realCounter;
Counter(org.apache.hadoop.mapreduce.Counter counter) {
this.realCounter = counter;
}
public Counter() {
this(new GenericCounter());
}
@SuppressWarnings("deprecation")
@Override
public void setDisplayName(String displayName) {
realCounter.setDisplayName(displayName);
}
@Override
public String getName() {
return realCounter.getName();
}
@Override
public String getDisplayName() {
return realCounter.getDisplayName();
}
@Override
public long getValue() {
return realCounter.getValue();
}
@Override
public void setValue(long value) {
realCounter.setValue(value);
}
@Override
public void increment(long incr) {
realCounter.increment(incr);
}
@Override
public void write(DataOutput out) throws IOException {
realCounter.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
realCounter.readFields(in);
}
/**
* Returns the compact stringified version of the counter in the format
* [(actual-name)(display-name)(value)]
* @return the stringified result
*/
String makeEscapedCompactString();
public String makeEscapedCompactString() {
return toEscapedCompactString(realCounter);
}
/**
* Checks for (content) equality of two (basic) counters
@ -88,38 +164,41 @@ public interface Counter extends org.apache.hadoop.mapreduce.Counter {
* @deprecated
*/
@Deprecated
boolean contentEquals(Counter counter);
public boolean contentEquals(Counter counter) {
return realCounter.equals(counter.getUnderlyingCounter());
}
/**
* @return the value of the counter
*/
long getCounter();
}
static class OldCounterImpl extends GenericCounter implements Counter {
OldCounterImpl() {
}
OldCounterImpl(String name, String displayName, long value) {
super(name, displayName, value);
}
@Override
public synchronized String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public boolean contentEquals(Counter counter) {
return equals(counter);
}
@Override
public long getCounter() {
return getValue();
return realCounter.getValue();
}
@Override
public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
return realCounter;
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof Counter) {
synchronized (genericRight) {
Counter right = (Counter) genericRight;
return getName().equals(right.getName()) &&
getDisplayName().equals(right.getDisplayName()) &&
getValue() == right.getValue();
}
}
return false;
}
@Override
public int hashCode() {
return realCounter.hashCode();
}
}
/**
* <code>Group</code> of counters, comprising of counters from a particular
@ -128,21 +207,38 @@ public long getCounter() {
* <p><code>Group</code>handles localization of the class name and the
* counter names.</p>
*/
public static interface Group extends CounterGroupBase<Counter> {
public static class Group implements CounterGroupBase<Counter> {
private CounterGroupBase<Counter> realGroup;
Group(GenericGroup group) {
this.realGroup = group;
}
Group(FSGroupImpl group) {
this.realGroup = group;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
Group(FrameworkGroupImpl group) {
this.realGroup = group;
}
/**
* @param counterName the name of the counter
* @return the value of the specified counter, or 0 if the counter does
* not exist.
*/
long getCounter(String counterName);
public long getCounter(String counterName) {
return getCounterValue(realGroup, counterName);
}
/**
* @return the compact stringified version of the group in the format
* {(actual-name)(display-name)(value)[][][]} where [] are compact strings
* for the counters within.
*/
String makeEscapedCompactString();
public String makeEscapedCompactString() {
return toEscapedCompactString(realGroup);
}
/**
* Get the counter for the given id and create it if it doesn't exist.
@ -152,172 +248,184 @@ public static interface Group extends CounterGroupBase<Counter> {
* @deprecated use {@link #findCounter(String)} instead
*/
@Deprecated
Counter getCounter(int id, String name);
public Counter getCounter(int id, String name) {
return findCounter(name);
}
/**
* Get the counter for the given name and create it if it doesn't exist.
* @param name the internal counter name
* @return the counter
*/
Counter getCounterForName(String name);
public Counter getCounterForName(String name) {
return findCounter(name);
}
@Override
public void write(DataOutput out) throws IOException {
realGroup.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
realGroup.readFields(in);
}
@Override
public Iterator<Counter> iterator() {
return realGroup.iterator();
}
@Override
public String getName() {
return realGroup.getName();
}
@Override
public String getDisplayName() {
return realGroup.getDisplayName();
}
@Override
public void setDisplayName(String displayName) {
realGroup.setDisplayName(displayName);
}
@Override
public void addCounter(Counter counter) {
realGroup.addCounter(counter);
}
@Override
public Counter addCounter(String name, String displayName, long value) {
return realGroup.addCounter(name, displayName, value);
}
@Override
public Counter findCounter(String counterName, String displayName) {
return realGroup.findCounter(counterName, displayName);
}
@Override
public Counter findCounter(String counterName, boolean create) {
return realGroup.findCounter(counterName, create);
}
@Override
public Counter findCounter(String counterName) {
return realGroup.findCounter(counterName);
}
@Override
public int size() {
return realGroup.size();
}
@Override
public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
realGroup.incrAllCounters(rightGroup);
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return realGroup;
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof CounterGroupBase<?>) {
@SuppressWarnings("unchecked")
CounterGroupBase<Counter> right = ((CounterGroupBase<Counter>)
genericRight).getUnderlyingGroup();
return Iterators.elementsEqual(iterator(), right.iterator());
}
return false;
}
@Override
public int hashCode() {
return realGroup.hashCode();
}
}
// All the group impls need this for legacy group interface
static long getCounterValue(Group group, String counterName) {
static long getCounterValue(CounterGroupBase<Counter> group, String counterName) {
Counter counter = group.findCounter(counterName, false);
if (counter != null) return counter.getValue();
return 0L;
}
// Mix the generic group implementation into the Group interface
private static class GenericGroup extends AbstractCounterGroup<Counter>
implements Group {
private static class GenericGroup extends AbstractCounterGroup<Counter> {
GenericGroup(String name, String displayName, Limits limits) {
super(name, displayName, limits);
}
@Override
public long getCounter(String counterName) {
return getCounterValue(this, counterName);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override
public Counter getCounter(int id, String name) {
return findCounter(name);
}
@Override
public Counter getCounterForName(String name) {
return findCounter(name);
}
@Override
protected Counter newCounter(String counterName, String displayName,
long value) {
return new OldCounterImpl(counterName, displayName, value);
return new Counter(new GenericCounter(counterName, displayName, value));
}
@Override
protected Counter newCounter() {
return new OldCounterImpl();
return new Counter();
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix the framework group implementation into the Group interface
private static class FrameworkGroupImpl<T extends Enum<T>>
extends FrameworkCounterGroup<T, Counter> implements Group {
// Mix the framework counter implmementation into the Counter interface
class FrameworkCounterImpl extends FrameworkCounter implements Counter {
extends FrameworkCounterGroup<T, Counter> {
// Mix the framework counter implementation into the Counter interface
class FrameworkCounterImpl extends FrameworkCounter {
FrameworkCounterImpl(T key) {
super(key);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override
public boolean contentEquals(Counter counter) {
return equals(counter);
}
@Override
public long getCounter() {
return getValue();
}
}
FrameworkGroupImpl(Class<T> cls) {
super(cls);
}
@Override
public long getCounter(String counterName) {
return getCounterValue(this, counterName);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public Counter getCounter(int id, String name) {
return findCounter(name);
}
@Override
public Counter getCounterForName(String name) {
return findCounter(name);
}
@Override
protected Counter newCounter(T key) {
return new FrameworkCounterImpl(key);
return new Counter(new FrameworkCounterImpl(key));
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix the file system counter group implementation into the Group interface
private static class FSGroupImpl extends FileSystemCounterGroup<Counter>
implements Group {
private static class FSGroupImpl extends FileSystemCounterGroup<Counter> {
private class FSCounterImpl extends FSCounter implements Counter {
private class FSCounterImpl extends FSCounter {
FSCounterImpl(String scheme, FileSystemCounter key) {
super(scheme, key);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public boolean contentEquals(Counter counter) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getCounter() {
return getValue();
}
}
@Override
protected Counter newCounter(String scheme, FileSystemCounter key) {
return new FSCounterImpl(scheme, key);
return new Counter(new FSCounterImpl(scheme, key));
}
@Override
public long getCounter(String counterName) {
return getCounterValue(this, counterName);
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public Counter getCounter(int id, String name) {
return findCounter(name);
}
@Override
public Counter getCounterForName(String name) {
return findCounter(name);
}
}
public synchronized Counter findCounter(String group, String name) {
@ -342,7 +450,7 @@ static class GroupFactory extends CounterGroupFactory<Counter, Group> {
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
return new FrameworkGroupFactory<Group>() {
@Override public Group newGroup(String name) {
return new FrameworkGroupImpl<T>(cls); // impl in this package
return new Group(new FrameworkGroupImpl<T>(cls)); // impl in this package
}
};
}
@ -350,12 +458,12 @@ FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
@Override
protected Group newGenericGroup(String name, String displayName,
Limits limits) {
return new GenericGroup(name, displayName, limits);
return new Group(new GenericGroup(name, displayName, limits));
}
@Override
protected Group newFileSystemGroup() {
return new FSGroupImpl();
return new Group(new FSGroupImpl());
}
}

View File

@ -72,4 +72,10 @@ public interface Counter extends Writable {
* @param incr the value to increase this counter by
*/
void increment(long incr);
/**
* Return the underlying object if this is a facade.
* @return the undelying object.
*/
Counter getUnderlyingCounter();
}

View File

@ -52,6 +52,11 @@ private static class FrameworkGroupImpl<T extends Enum<T>>
protected FrameworkCounter newCounter(T key) {
return new FrameworkCounter(key);
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix generic group implementation into CounterGroup interface
@ -72,6 +77,11 @@ protected Counter newCounter(String name, String displayName, long value) {
protected Counter newCounter() {
return new GenericCounter();
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix file system group implementation into the CounterGroup interface
@ -82,6 +92,11 @@ private static class FileSystemGroup extends FileSystemCounterGroup<Counter>
protected Counter newCounter(String scheme, FileSystemCounter key) {
return new FSCounter(scheme, key);
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
/**

View File

@ -172,7 +172,8 @@ public synchronized C findCounter(Enum<?> key) {
@InterfaceAudience.Private
public synchronized C findCounter(String scheme, FileSystemCounter key) {
return ((FileSystemCounterGroup<C>) getGroup(
FileSystemCounter.class.getName())).findCounter(scheme, key);
FileSystemCounter.class.getName()).getUnderlyingGroup()).
findCounter(scheme, key);
}
/**
@ -243,11 +244,11 @@ public synchronized void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, groupFactory.version());
WritableUtils.writeVInt(out, fgroups.size()); // framework groups first
for (G group : fgroups.values()) {
if (group instanceof FrameworkCounterGroup<?, ?>) {
if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
group.write(out);
} else if (group instanceof FileSystemCounterGroup<?>) {
} else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
group.write(out);
}

View File

@ -98,4 +98,10 @@ public interface CounterGroupBase<T extends Counter>
* @param rightGroup the group to be added to this group
*/
void incrAllCounters(CounterGroupBase<T> rightGroup);
/**
* Exposes the underlying group type if a facade.
* @return the underlying object that this object is wrapping up.
*/
CounterGroupBase<T> getUnderlyingGroup();
}

View File

@ -110,6 +110,11 @@ public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
assert false : "shouldn't be called";
}
@Override
public Counter getUnderlyingCounter() {
return this;
}
}
@Override
@ -231,10 +236,10 @@ public int size() {
@Override
@SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other, "other group")
if (checkNotNull(other.getUnderlyingGroup(), "other group")
instanceof FileSystemCounterGroup<?>) {
for (Counter counter : other) {
FSCounter c = (FSCounter) counter;
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
findCounter(c.scheme, c.key) .increment(counter.getValue());
}
}
@ -253,7 +258,7 @@ public void write(DataOutput out) throws IOException {
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) counter;
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}

View File

@ -18,21 +18,24 @@
package org.apache.hadoop.mapreduce.counters;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import static com.google.common.base.Preconditions.*;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.util.ResourceBundles;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
/**
* An abstract class to provide common implementation for the framework
* counter group in both mapred and mapreduce packages.
@ -43,6 +46,7 @@
@InterfaceAudience.Private
public abstract class FrameworkCounterGroup<T extends Enum<T>,
C extends Counter> implements CounterGroupBase<C> {
private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
private final Class<T> enumClass; // for Enum.valueOf
private final Object[] counters; // local casts are OK and save a class ref
@ -95,6 +99,11 @@ public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
assert false : "shouldn't be called";
}
@Override
public Counter getUnderlyingCounter() {
return this;
}
}
@SuppressWarnings("unchecked")

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
/**
* A generic counter implementation
@ -101,4 +102,9 @@ public synchronized void setValue(long value) {
public synchronized void increment(long incr) {
value += incr;
}
@Override
public Counter getUnderlyingCounter() {
return this;
}
}

View File

@ -24,6 +24,8 @@
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapreduce.FileSystemCounter;
@ -37,6 +39,7 @@
public class TestCounters {
enum myCounters {TEST1, TEST2};
private static final long MAX_VALUE = 10;
private static final Log LOG = LogFactory.getLog(TestCounters.class);
// Generates enum based counters
private Counters getEnumCounters(Enum[] keys) {

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
@ -25,11 +29,6 @@
import java.util.List;
import java.util.StringTokenizer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -40,12 +39,14 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* This is an wordcount application that tests the count of records