diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d7e73cb9e67..1aafb435952 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -28,6 +28,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5906. Inconsistent configuration in property "mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw) + MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with + fallback. (Todd Lipcon via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 84fdd92cc5d..b00bc3c6ff3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -380,16 +380,35 @@ public class MapTask extends Task { private MapOutputCollector createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { - MapOutputCollector collector - = (MapOutputCollector) - ReflectionUtils.newInstance( - job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, - MapOutputBuffer.class, MapOutputCollector.class), job); - LOG.info("Map output collector class = " + collector.getClass().getName()); MapOutputCollector.Context context = - new MapOutputCollector.Context(this, job, reporter); - collector.init(context); - return collector; + new MapOutputCollector.Context(this, job, reporter); + + Class[] collectorClasses = job.getClasses( + JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); + int remainingCollectors = collectorClasses.length; + for (Class clazz : collectorClasses) { + try { + if (!MapOutputCollector.class.isAssignableFrom(clazz)) { + throw new IOException("Invalid output collector class: " + clazz.getName() + + " (does not implement MapOutputCollector)"); + } + Class subclazz = + clazz.asSubclass(MapOutputCollector.class); + LOG.debug("Trying map output collector class: " + subclazz.getName()); + MapOutputCollector collector = + ReflectionUtils.newInstance(subclazz, job); + collector.init(context); + LOG.info("Map output collector class = " + collector.getClass().getName()); + return collector; + } catch (Exception e) { + String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); + if (--remainingCollectors > 0) { + msg += " (" + remainingCollectors + " more collector(s) to try)"; + } + LOG.warn(msg, e); + } + } + throw new IOException("Unable to initialize any output collector"); } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index bc612d2552f..6ab8dce3a35 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -664,7 +664,9 @@ mapreduce.job.map.output.collector.class org.apache.hadoop.mapred.MapTask$MapOutputBuffer - It defines the MapOutputCollector implementation to use. + The MapOutputCollector implementation(s) to use. This may be a comma-separated + list of class names, in which case the map task will try to initialize each + of the collectors in turn. The first to successfully initialize will be used. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm index 1b06ca9bfbe..06d802213d9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm @@ -71,11 +71,16 @@ Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort *--------------------------------------+---------------------+-----------------+ | <<>> | <<>> | The <<>> implementation to use | *--------------------------------------+---------------------+-----------------+ -| <<>> | <<>> | The <<>> implementation to use | +| <<>> | <<>> | The <<>> implementation(s) to use | *--------------------------------------+---------------------+-----------------+ These properties can also be set in the <<>> to change the default values for all jobs. + The collector class configuration may specify a comma-separated list of collector implementations. + In this case, the map task will attempt to instantiate each in turn until one of the + implementations successfully initializes. This can be useful if a given collector + implementation is only compatible with certain types of keys or values, for example. + ** NodeManager Configuration properties, <<>> in all nodes: *--------------------------------------+---------------------+-----------------+ @@ -91,4 +96,3 @@ Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort <<>> property, for example <<>>. Then the property defining the corresponding class must be <<>>. - \ No newline at end of file