MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for performance. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ccc2807d86
commit
04f685ea76
|
@ -764,6 +764,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3823. Ensure counters are calculated only once after a job
|
MAPREDUCE-3823. Ensure counters are calculated only once after a job
|
||||||
finishes. (Vinod Kumar Vavilapalli via sseth)
|
finishes. (Vinod Kumar Vavilapalli via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for
|
||||||
|
performance. (vinodkv via acmurthy)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -22,11 +22,8 @@ import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.common.collect.Iterators;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -34,6 +31,8 @@ import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
import org.apache.hadoop.mapreduce.util.ResourceBundles;
|
import org.apache.hadoop.mapreduce.util.ResourceBundles;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstract class to provide common implementation of the
|
* An abstract class to provide common implementation of the
|
||||||
* generic counter group in both mapred and mapreduce package.
|
* generic counter group in both mapred and mapreduce package.
|
||||||
|
@ -46,7 +45,8 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
private String displayName;
|
private String displayName;
|
||||||
private final Map<String, T> counters = Maps.newTreeMap();
|
private final ConcurrentMap<String, T> counters =
|
||||||
|
new ConcurrentSkipListMap<String, T>();
|
||||||
private final Limits limits;
|
private final Limits limits;
|
||||||
|
|
||||||
public AbstractCounterGroup(String name, String displayName,
|
public AbstractCounterGroup(String name, String displayName,
|
||||||
|
@ -80,7 +80,7 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
||||||
@Override
|
@Override
|
||||||
public synchronized T addCounter(String counterName, String displayName,
|
public synchronized T addCounter(String counterName, String displayName,
|
||||||
long value) {
|
long value) {
|
||||||
String saveName = limits.filterCounterName(counterName);
|
String saveName = Limits.filterCounterName(counterName);
|
||||||
T counter = findCounterImpl(saveName, false);
|
T counter = findCounterImpl(saveName, false);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
return addCounterImpl(saveName, displayName, value);
|
return addCounterImpl(saveName, displayName, value);
|
||||||
|
@ -97,7 +97,9 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized T findCounter(String counterName, String displayName) {
|
public synchronized T findCounter(String counterName, String displayName) {
|
||||||
String saveName = limits.filterCounterName(counterName);
|
// Take lock to avoid two threads not finding a counter and trying to add
|
||||||
|
// the same counter.
|
||||||
|
String saveName = Limits.filterCounterName(counterName);
|
||||||
T counter = findCounterImpl(saveName, false);
|
T counter = findCounterImpl(saveName, false);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
return addCounterImpl(saveName, displayName, 0);
|
return addCounterImpl(saveName, displayName, 0);
|
||||||
|
@ -106,10 +108,12 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized T findCounter(String counterName, boolean create) {
|
public T findCounter(String counterName, boolean create) {
|
||||||
return findCounterImpl(limits.filterCounterName(counterName), create);
|
return findCounterImpl(Limits.filterCounterName(counterName), create);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Lock the object. Cannot simply use concurrent constructs on the counters
|
||||||
|
// data-structure (like putIfAbsent) because of localization, limits etc.
|
||||||
private synchronized T findCounterImpl(String counterName, boolean create) {
|
private synchronized T findCounterImpl(String counterName, boolean create) {
|
||||||
T counter = counters.get(counterName);
|
T counter = counters.get(counterName);
|
||||||
if (counter == null && create) {
|
if (counter == null && create) {
|
||||||
|
@ -142,8 +146,8 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
||||||
protected abstract T newCounter();
|
protected abstract T newCounter();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Iterator<T> iterator() {
|
public Iterator<T> iterator() {
|
||||||
return ImmutableSet.copyOf(counters.values()).iterator();
|
return counters.values().iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,19 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.counters;
|
package org.apache.hadoop.mapreduce.counters;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId;
|
||||||
|
import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Iterators;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -40,7 +39,10 @@ import org.apache.hadoop.mapreduce.Counter;
|
||||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*;
|
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstract class to provide common implementation for the Counters
|
* An abstract class to provide common implementation for the Counters
|
||||||
|
@ -61,8 +63,10 @@ public abstract class AbstractCounters<C extends Counter,
|
||||||
* A cache from enum values to the associated counter.
|
* A cache from enum values to the associated counter.
|
||||||
*/
|
*/
|
||||||
private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
|
private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
|
||||||
private Map<String, G> fgroups = Maps.newTreeMap(); // framework & fs groups
|
//framework & fs groups
|
||||||
private Map<String, G> groups = Maps.newTreeMap(); // other groups
|
private Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
|
||||||
|
// other groups
|
||||||
|
private Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
|
||||||
private final CounterGroupFactory<C, G> groupFactory;
|
private final CounterGroupFactory<C, G> groupFactory;
|
||||||
|
|
||||||
// For framework counter serialization without strings
|
// For framework counter serialization without strings
|
||||||
|
@ -181,14 +185,13 @@ public abstract class AbstractCounters<C extends Counter,
|
||||||
* @return Set of counter names.
|
* @return Set of counter names.
|
||||||
*/
|
*/
|
||||||
public synchronized Iterable<String> getGroupNames() {
|
public synchronized Iterable<String> getGroupNames() {
|
||||||
return Iterables.concat(ImmutableSet.copyOf(fgroups.keySet()),
|
return Iterables.concat(fgroups.keySet(), groups.keySet());
|
||||||
ImmutableSet.copyOf(groups.keySet()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Iterator<G> iterator() {
|
public Iterator<G> iterator() {
|
||||||
return Iterators.concat(ImmutableSet.copyOf(fgroups.values()).iterator(),
|
return Iterators.concat(fgroups.values().iterator(),
|
||||||
ImmutableSet.copyOf(groups.values()).iterator());
|
groups.values().iterator());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -216,7 +219,7 @@ public abstract class AbstractCounters<C extends Counter,
|
||||||
private String filterGroupName(String oldName) {
|
private String filterGroupName(String oldName) {
|
||||||
String newName = legacyMap.get(oldName);
|
String newName = legacyMap.get(oldName);
|
||||||
if (newName == null) {
|
if (newName == null) {
|
||||||
return limits.filterGroupName(oldName);
|
return Limits.filterGroupName(oldName);
|
||||||
}
|
}
|
||||||
LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
|
LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
|
||||||
return newName;
|
return newName;
|
||||||
|
|
|
@ -42,11 +42,11 @@ public class Limits {
|
||||||
return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
|
return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String filterCounterName(String name) {
|
public static String filterCounterName(String name) {
|
||||||
return filterName(name, COUNTER_NAME_MAX);
|
return filterName(name, COUNTER_NAME_MAX);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String filterGroupName(String name) {
|
public static String filterGroupName(String name) {
|
||||||
return filterName(name, GROUP_NAME_MAX);
|
return filterName(name, GROUP_NAME_MAX);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue