From 04f685ea76a085a83aa2eb5a52f920a16df2b300 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 8 Feb 2012 00:16:54 +0000 Subject: [PATCH] MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for performance. Contributed by Vinod K V. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241711 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../counters/AbstractCounterGroup.java | 28 +++++++++------- .../mapreduce/counters/AbstractCounters.java | 33 ++++++++++--------- .../hadoop/mapreduce/counters/Limits.java | 4 +-- 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 6d6215e58eb..c4f2b708dca 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -764,6 +764,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3823. Ensure counters are calculated only once after a job finishes. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for + performance. (vinodkv via acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java index 68fded801de..1b9935ba083 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java @@ -22,11 +22,8 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; -import java.util.Map; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Text; @@ -34,6 +31,8 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.util.ResourceBundles; +import com.google.common.collect.Iterators; + /** * An abstract class to provide common implementation of the * generic counter group in both mapred and mapreduce package. @@ -46,7 +45,8 @@ public abstract class AbstractCounterGroup private final String name; private String displayName; - private final Map counters = Maps.newTreeMap(); + private final ConcurrentMap counters = + new ConcurrentSkipListMap(); private final Limits limits; public AbstractCounterGroup(String name, String displayName, @@ -80,7 +80,7 @@ public synchronized void addCounter(T counter) { @Override public synchronized T addCounter(String counterName, String displayName, long value) { - String saveName = limits.filterCounterName(counterName); + String saveName = Limits.filterCounterName(counterName); T counter = findCounterImpl(saveName, false); if (counter == null) { return addCounterImpl(saveName, displayName, value); @@ -97,7 +97,9 @@ private T addCounterImpl(String name, String displayName, long value) { @Override public synchronized T findCounter(String counterName, String displayName) { - String saveName = limits.filterCounterName(counterName); + // Take lock to avoid two threads not finding a counter and trying to add + // the same counter. + String saveName = Limits.filterCounterName(counterName); T counter = findCounterImpl(saveName, false); if (counter == null) { return addCounterImpl(saveName, displayName, 0); @@ -106,10 +108,12 @@ public synchronized T findCounter(String counterName, String displayName) { } @Override - public synchronized T findCounter(String counterName, boolean create) { - return findCounterImpl(limits.filterCounterName(counterName), create); + public T findCounter(String counterName, boolean create) { + return findCounterImpl(Limits.filterCounterName(counterName), create); } + // Lock the object. Cannot simply use concurrent constructs on the counters + // data-structure (like putIfAbsent) because of localization, limits etc. private synchronized T findCounterImpl(String counterName, boolean create) { T counter = counters.get(counterName); if (counter == null && create) { @@ -142,8 +146,8 @@ protected abstract T newCounter(String counterName, String displayName, protected abstract T newCounter(); @Override - public synchronized Iterator iterator() { - return ImmutableSet.copyOf(counters.values()).iterator(); + public Iterator iterator() { + return counters.values().iterator(); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java index 7e15e6fcb51..73434ae9bdb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java @@ -18,19 +18,18 @@ package org.apache.hadoop.mapreduce.counters; +import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId; +import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; - -import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Text; @@ -40,7 +39,10 @@ import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.TaskCounter; -import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; /** * An abstract class to provide common implementation for the Counters @@ -61,8 +63,10 @@ public abstract class AbstractCounters, C> cache = Maps.newIdentityHashMap(); - private Map fgroups = Maps.newTreeMap(); // framework & fs groups - private Map groups = Maps.newTreeMap(); // other groups + //framework & fs groups + private Map fgroups = new ConcurrentSkipListMap(); + // other groups + private Map groups = new ConcurrentSkipListMap(); private final CounterGroupFactory groupFactory; // For framework counter serialization without strings @@ -181,14 +185,13 @@ public synchronized C findCounter(String scheme, FileSystemCounter key) { * @return Set of counter names. */ public synchronized Iterable getGroupNames() { - return Iterables.concat(ImmutableSet.copyOf(fgroups.keySet()), - ImmutableSet.copyOf(groups.keySet())); + return Iterables.concat(fgroups.keySet(), groups.keySet()); } @Override - public synchronized Iterator iterator() { - return Iterators.concat(ImmutableSet.copyOf(fgroups.values()).iterator(), - ImmutableSet.copyOf(groups.values()).iterator()); + public Iterator iterator() { + return Iterators.concat(fgroups.values().iterator(), + groups.values().iterator()); } /** @@ -216,7 +219,7 @@ public synchronized G getGroup(String groupName) { private String filterGroupName(String oldName) { String newName = legacyMap.get(oldName); if (newName == null) { - return limits.filterGroupName(oldName); + return Limits.filterGroupName(oldName); } LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead"); return newName; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java index 24740095834..d22ac7011a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java @@ -42,11 +42,11 @@ public static String filterName(String name, int maxLen) { return name.length() > maxLen ? name.substring(0, maxLen - 1) : name; } - public String filterCounterName(String name) { + public static String filterCounterName(String name) { return filterName(name, COUNTER_NAME_MAX); } - public String filterGroupName(String name) { + public static String filterGroupName(String name) { return filterName(name, GROUP_NAME_MAX); }