MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. Contributed by Anver BenHanoch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1438793 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-01-26 00:38:38 +00:00
parent 578bc2ce0c
commit c4c039bb29
7 changed files with 438 additions and 51 deletions

View File

@ -18,6 +18,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4810. Added new admin command options for MR AM. (Jerry Chen via MAPREDUCE-4810. Added new admin command options for MR AM. (Jerry Chen via
vinodkv) vinodkv)
MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
(Avner BenHanoch via acmurthy)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-3678. The Map tasks logs should have the value of input MAPREDUCE-3678. The Map tasks logs should have the value of input

View File

@ -340,6 +340,7 @@ public class ReduceTask extends Task {
// Initialize the codec // Initialize the codec
codec = initCodec(); codec = initCodec();
RawKeyValueIterator rIter = null; RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
boolean isLocal = false; boolean isLocal = false;
// local if // local if
@ -358,8 +359,14 @@ public class ReduceTask extends Task {
(null != combinerClass) ? (null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Shuffle shuffle = Class<? extends ShuffleConsumerPlugin> clazz =
new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec, super.lDirAlloc, reporter, codec,
combinerClass, combineCollector, combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter, spilledRecordsCounter, reduceCombineInputCounter,
@ -368,7 +375,8 @@ public class ReduceTask extends Task {
mergedMapOutputsCounter, mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this, taskStatus, copyPhase, sortPhase, this,
mapOutputFile); mapOutputFile);
rIter = shuffle.run(); shuffleConsumerPlugin.init(shuffleContext);
rIter = shuffleConsumerPlugin.run();
} else { } else {
// local job runner doesn't have a copy phase // local job runner doesn't have a copy phase
copyPhase.complete(); copyPhase.complete();
@ -399,6 +407,10 @@ public class ReduceTask extends Task {
runOldReducer(job, umbilical, reporter, rIter, comparator, runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass); keyClass, valueClass);
} }
if (shuffleConsumerPlugin != null) {
shuffleConsumerPlugin.close();
}
done(umbilical, reporter); done(umbilical, reporter);
} }

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* ShuffleConsumerPlugin for serving Reducers. It may shuffle MOF files from
* either the built-in ShuffleHandler or from a 3rd party AuxiliaryService.
*
*/
@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
public interface ShuffleConsumerPlugin<K, V> {
public void init(Context<K, V> context);
public RawKeyValueIterator run() throws IOException, InterruptedException;
public void close();
@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
public static class Context<K,V> {
private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
private final JobConf jobConf;
private final FileSystem localFS;
private final TaskUmbilicalProtocol umbilical;
private final LocalDirAllocator localDirAllocator;
private final Reporter reporter;
private final CompressionCodec codec;
private final Class<? extends Reducer> combinerClass;
private final CombineOutputCollector<K, V> combineCollector;
private final Counters.Counter spilledRecordsCounter;
private final Counters.Counter reduceCombineInputCounter;
private final Counters.Counter shuffledMapsCounter;
private final Counters.Counter reduceShuffleBytes;
private final Counters.Counter failedShuffleCounter;
private final Counters.Counter mergedMapOutputsCounter;
private final TaskStatus status;
private final Progress copyPhase;
private final Progress mergePhase;
private final Task reduceTask;
private final MapOutputFile mapOutputFile;
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
TaskUmbilicalProtocol umbilical,
LocalDirAllocator localDirAllocator,
Reporter reporter, CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
Task reduceTask, MapOutputFile mapOutputFile) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
this. umbilical = umbilical;
this.localDirAllocator = localDirAllocator;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.spilledRecordsCounter = spilledRecordsCounter;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.status = status;
this.copyPhase = copyPhase;
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
}
public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
return reduceId;
}
public JobConf getJobConf() {
return jobConf;
}
public FileSystem getLocalFS() {
return localFS;
}
public TaskUmbilicalProtocol getUmbilical() {
return umbilical;
}
public LocalDirAllocator getLocalDirAllocator() {
return localDirAllocator;
}
public Reporter getReporter() {
return reporter;
}
public CompressionCodec getCodec() {
return codec;
}
public Class<? extends Reducer> getCombinerClass() {
return combinerClass;
}
public CombineOutputCollector<K, V> getCombineCollector() {
return combineCollector;
}
public Counters.Counter getSpilledRecordsCounter() {
return spilledRecordsCounter;
}
public Counters.Counter getReduceCombineInputCounter() {
return reduceCombineInputCounter;
}
public Counters.Counter getShuffledMapsCounter() {
return shuffledMapsCounter;
}
public Counters.Counter getReduceShuffleBytes() {
return reduceShuffleBytes;
}
public Counters.Counter getFailedShuffleCounter() {
return failedShuffleCounter;
}
public Counters.Counter getMergedMapOutputsCounter() {
return mergedMapOutputsCounter;
}
public TaskStatus getStatus() {
return status;
}
public Progress getCopyPhase() {
return copyPhase;
}
public Progress getMergePhase() {
return mergePhase;
}
public Task getReduceTask() {
return reduceTask;
}
public MapOutputFile getMapOutputFile() {
return mapOutputFile;
}
} // end of public static class Context<K,V>
}

View File

@ -85,6 +85,9 @@ public interface MRConfig {
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
public static final String SHUFFLE_CONSUMER_PLUGIN =
"mapreduce.job.reduce.shuffle.consumer.plugin.class";
/** /**
* Configuration key to enable/disable IFile readahead. * Configuration key to enable/disable IFile readahead.
*/ */

View File

@ -34,73 +34,63 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable @InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter { public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000; private static final int PROGRESS_FREQUENCY = 2000;
private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MIN_EVENTS_TO_FETCH = 100; private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
private final TaskAttemptID reduceId; private ShuffleConsumerPlugin.Context context;
private final JobConf jobConf;
private final Reporter reporter;
private final ShuffleClientMetrics metrics;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler<K,V> scheduler; private TaskAttemptID reduceId;
private final MergeManager<K, V> merger; private JobConf jobConf;
private Reporter reporter;
private ShuffleClientMetrics metrics;
private TaskUmbilicalProtocol umbilical;
private ShuffleScheduler<K,V> scheduler;
private MergeManager<K, V> merger;
private Throwable throwable = null; private Throwable throwable = null;
private String throwingThreadName = null; private String throwingThreadName = null;
private final Progress copyPhase; private Progress copyPhase;
private final TaskStatus taskStatus; private TaskStatus taskStatus;
private final Task reduceTask; //Used for status updates private Task reduceTask; //Used for status updates
public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, @Override
TaskUmbilicalProtocol umbilical, public void init(ShuffleConsumerPlugin.Context context) {
LocalDirAllocator localDirAllocator, this.context = context;
Reporter reporter,
CompressionCodec codec, this.reduceId = context.getReduceId();
Class<? extends Reducer> combinerClass, this.jobConf = context.getJobConf();
CombineOutputCollector<K,V> combineCollector, this.umbilical = context.getUmbilical();
Counters.Counter spilledRecordsCounter, this.reporter = context.getReporter();
Counters.Counter reduceCombineInputCounter,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status,
Progress copyPhase,
Progress mergePhase,
Task reduceTask,
MapOutputFile mapOutputFile) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.umbilical = umbilical;
this.reporter = reporter;
this.metrics = new ShuffleClientMetrics(reduceId, jobConf); this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
this.copyPhase = copyPhase; this.copyPhase = context.getCopyPhase();
this.taskStatus = status; this.taskStatus = context.getStatus();
this.reduceTask = reduceTask; this.reduceTask = context.getReduceTask();
scheduler = scheduler =
new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase, new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase,
shuffledMapsCounter, context.getShuffledMapsCounter(),
reduceShuffleBytes, failedShuffleCounter); context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
merger = new MergeManager<K, V>(reduceId, jobConf, localFS, merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
localDirAllocator, reporter, codec, context.getLocalDirAllocator(), reporter, context.getCodec(),
combinerClass, combineCollector, context.getCombinerClass(), context.getCombineCollector(),
spilledRecordsCounter, context.getSpilledRecordsCounter(),
reduceCombineInputCounter, context.getReduceCombineInputCounter(),
mergedMapOutputsCounter, context.getMergedMapOutputsCounter(),
this, mergePhase, mapOutputFile); this, context.getMergePhase(), context.getMapOutputFile());
} }
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException { public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues // Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events // on the ApplicationMaster when a thundering herd of reducers fetch events
@ -171,6 +161,10 @@ public class Shuffle<K, V> implements ExceptionReporter {
return kvIter; return kvIter;
} }
@Override
public void close(){
}
public synchronized void reportException(Throwable t) { public synchronized void reportException(Throwable t) {
if (throwable == null) { if (throwable == null) {
throwable = t; throwable = t;

View File

@ -1162,6 +1162,16 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
<value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value>
<description>
Name of the class whose instance will be used
to send shuffle requests by reducetasks of this job.
The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin.
</description>
</property>
<!-- Node health script variables --> <!-- Node health script variables -->
<property> <property>

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
/**
* A JUnit for testing availability and accessibility of shuffle related API.
* It is needed for maintaining comptability with external sub-classes of
* ShuffleConsumerPlugin and AuxiliaryService(s) like ShuffleHandler.
*
* The importance of this test is for preserving API with 3rd party plugins.
*/
public class TestShufflePlugin<K, V> {
static class TestShuffleConsumerPlugin<K, V> implements ShuffleConsumerPlugin<K, V> {
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
// just verify that Context has kept its public interface
context.getReduceId();
context.getJobConf();
context.getLocalFS();
context.getUmbilical();
context.getLocalDirAllocator();
context.getReporter();
context.getCodec();
context.getCombinerClass();
context.getCombineCollector();
context.getSpilledRecordsCounter();
context.getReduceCombineInputCounter();
context.getShuffledMapsCounter();
context.getReduceShuffleBytes();
context.getFailedShuffleCounter();
context.getMergedMapOutputsCounter();
context.getStatus();
context.getCopyPhase();
context.getMergePhase();
context.getReduceTask();
context.getMapOutputFile();
}
@Override
public void close(){
}
@Override
public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{
return null;
}
}
@Test
/**
* A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
* as if it came from a 3rd party.
*/
public void testPluginAbility() {
try{
// create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
JobConf jobConf = new JobConf();
jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
TestShufflePlugin.TestShuffleConsumerPlugin.class,
ShuffleConsumerPlugin.class);
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class<? extends ShuffleConsumerPlugin> clazz =
jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);
// load 3rd party plugin through core's factory method
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
@Test
/**
* A testing method verifying availability and accessibility of API that is needed
* for sub-classes of ShuffleConsumerPlugin
*/
public void testConsumerApi() {
JobConf jobConf = new JobConf();
ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();
//mock creation
ReduceTask mockReduceTask = mock(ReduceTask.class);
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
try {
String [] dirs = jobConf.getLocalDirs();
// verify that these APIs are available through super class handler
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
// verify that these APIs are available for 3rd party plugins
mockReduceTask.getTaskID();
mockReduceTask.getJobID();
mockReduceTask.getNumMaps();
mockReduceTask.getPartition();
mockReporter.progress();
}
@Test
/**
* A testing method verifying availability and accessibility of API needed for
* AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins)
*/
public void testProviderApi() {
ApplicationId mockApplicationId = mock(ApplicationId.class);
mockApplicationId.setClusterTimestamp(new Long(10));
mockApplicationId.setId(mock(JobID.class).getId());
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
JobConf mockJobConf = mock(JobConf.class);
try {
mockLocalDirAllocator.getLocalPathToRead("", mockJobConf);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
}