From c4c039bb294c5a9d53d515a7dfba44a747aca36a Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Sat, 26 Jan 2013 00:38:38 +0000 Subject: [PATCH] MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. Contributed by Anver BenHanoch. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1438793 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/ReduceTask.java | 18 +- .../hadoop/mapred/ShuffleConsumerPlugin.java | 168 +++++++++++++++ .../org/apache/hadoop/mapreduce/MRConfig.java | 3 + .../hadoop/mapreduce/task/reduce/Shuffle.java | 90 ++++---- .../src/main/resources/mapred-default.xml | 10 + .../hadoop/mapreduce/TestShufflePlugin.java | 197 ++++++++++++++++++ 7 files changed, 438 insertions(+), 51 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 07b4c5568e6..0e33d3c16db 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -18,6 +18,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4810. Added new admin command options for MR AM. (Jerry Chen via vinodkv) + MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. + (Avner BenHanoch via acmurthy) + IMPROVEMENTS MAPREDUCE-3678. The Map tasks logs should have the value of input diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java index 4e48c21a3e0..e8d97fab6ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java @@ -340,6 +340,7 @@ public class ReduceTask extends Task { // Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; + ShuffleConsumerPlugin shuffleConsumerPlugin = null; boolean isLocal = false; // local if @@ -358,8 +359,14 @@ public class ReduceTask extends Task { (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; - Shuffle shuffle = - new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, + Class clazz = + job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); + + shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); + LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); + + ShuffleConsumerPlugin.Context shuffleContext = + new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, @@ -368,7 +375,8 @@ public class ReduceTask extends Task { mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile); - rIter = shuffle.run(); + shuffleConsumerPlugin.init(shuffleContext); + rIter = shuffleConsumerPlugin.run(); } else { // local job runner doesn't have a copy phase copyPhase.complete(); @@ -399,6 +407,10 @@ public class ReduceTask extends Task { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } + + if (shuffleConsumerPlugin != null) { + shuffleConsumerPlugin.close(); + } done(umbilical, reporter); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java new file mode 100644 index 00000000000..f57275255f1 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * ShuffleConsumerPlugin for serving Reducers. It may shuffle MOF files from + * either the built-in ShuffleHandler or from a 3rd party AuxiliaryService. + * + */ +@InterfaceAudience.LimitedPrivate("mapreduce") +@InterfaceStability.Unstable +public interface ShuffleConsumerPlugin { + + public void init(Context context); + + public RawKeyValueIterator run() throws IOException, InterruptedException; + + public void close(); + + @InterfaceAudience.LimitedPrivate("mapreduce") + @InterfaceStability.Unstable + public static class Context { + private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId; + private final JobConf jobConf; + private final FileSystem localFS; + private final TaskUmbilicalProtocol umbilical; + private final LocalDirAllocator localDirAllocator; + private final Reporter reporter; + private final CompressionCodec codec; + private final Class combinerClass; + private final CombineOutputCollector combineCollector; + private final Counters.Counter spilledRecordsCounter; + private final Counters.Counter reduceCombineInputCounter; + private final Counters.Counter shuffledMapsCounter; + private final Counters.Counter reduceShuffleBytes; + private final Counters.Counter failedShuffleCounter; + private final Counters.Counter mergedMapOutputsCounter; + private final TaskStatus status; + private final Progress copyPhase; + private final Progress mergePhase; + private final Task reduceTask; + private final MapOutputFile mapOutputFile; + + public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, + JobConf jobConf, FileSystem localFS, + TaskUmbilicalProtocol umbilical, + LocalDirAllocator localDirAllocator, + Reporter reporter, CompressionCodec codec, + Class combinerClass, + CombineOutputCollector combineCollector, + Counters.Counter spilledRecordsCounter, + Counters.Counter reduceCombineInputCounter, + Counters.Counter shuffledMapsCounter, + Counters.Counter reduceShuffleBytes, + Counters.Counter failedShuffleCounter, + Counters.Counter mergedMapOutputsCounter, + TaskStatus status, Progress copyPhase, Progress mergePhase, + Task reduceTask, MapOutputFile mapOutputFile) { + this.reduceId = reduceId; + this.jobConf = jobConf; + this.localFS = localFS; + this. umbilical = umbilical; + this.localDirAllocator = localDirAllocator; + this.reporter = reporter; + this.codec = codec; + this.combinerClass = combinerClass; + this.combineCollector = combineCollector; + this.spilledRecordsCounter = spilledRecordsCounter; + this.reduceCombineInputCounter = reduceCombineInputCounter; + this.shuffledMapsCounter = shuffledMapsCounter; + this.reduceShuffleBytes = reduceShuffleBytes; + this.failedShuffleCounter = failedShuffleCounter; + this.mergedMapOutputsCounter = mergedMapOutputsCounter; + this.status = status; + this.copyPhase = copyPhase; + this.mergePhase = mergePhase; + this.reduceTask = reduceTask; + this.mapOutputFile = mapOutputFile; + } + + public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() { + return reduceId; + } + public JobConf getJobConf() { + return jobConf; + } + public FileSystem getLocalFS() { + return localFS; + } + public TaskUmbilicalProtocol getUmbilical() { + return umbilical; + } + public LocalDirAllocator getLocalDirAllocator() { + return localDirAllocator; + } + public Reporter getReporter() { + return reporter; + } + public CompressionCodec getCodec() { + return codec; + } + public Class getCombinerClass() { + return combinerClass; + } + public CombineOutputCollector getCombineCollector() { + return combineCollector; + } + public Counters.Counter getSpilledRecordsCounter() { + return spilledRecordsCounter; + } + public Counters.Counter getReduceCombineInputCounter() { + return reduceCombineInputCounter; + } + public Counters.Counter getShuffledMapsCounter() { + return shuffledMapsCounter; + } + public Counters.Counter getReduceShuffleBytes() { + return reduceShuffleBytes; + } + public Counters.Counter getFailedShuffleCounter() { + return failedShuffleCounter; + } + public Counters.Counter getMergedMapOutputsCounter() { + return mergedMapOutputsCounter; + } + public TaskStatus getStatus() { + return status; + } + public Progress getCopyPhase() { + return copyPhase; + } + public Progress getMergePhase() { + return mergePhase; + } + public Task getReduceTask() { + return reduceTask; + } + public MapOutputFile getMapOutputFile() { + return mapOutputFile; + } + } // end of public static class Context + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java index cd560e1a0a6..375391320be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -85,6 +85,9 @@ public interface MRConfig { public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; + public static final String SHUFFLE_CONSUMER_PLUGIN = + "mapreduce.job.reduce.shuffle.consumer.plugin.class"; + /** * Configuration key to enable/disable IFile readahead. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index e582d2856f4..fc22979797a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -34,73 +34,63 @@ import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapred.ShuffleConsumerPlugin; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progress; -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("mapreduce") @InterfaceStability.Unstable @SuppressWarnings({"unchecked", "rawtypes"}) -public class Shuffle implements ExceptionReporter { +public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MIN_EVENTS_TO_FETCH = 100; private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; - private final TaskAttemptID reduceId; - private final JobConf jobConf; - private final Reporter reporter; - private final ShuffleClientMetrics metrics; - private final TaskUmbilicalProtocol umbilical; + private ShuffleConsumerPlugin.Context context; + + private TaskAttemptID reduceId; + private JobConf jobConf; + private Reporter reporter; + private ShuffleClientMetrics metrics; + private TaskUmbilicalProtocol umbilical; - private final ShuffleScheduler scheduler; - private final MergeManager merger; + private ShuffleScheduler scheduler; + private MergeManager merger; private Throwable throwable = null; private String throwingThreadName = null; - private final Progress copyPhase; - private final TaskStatus taskStatus; - private final Task reduceTask; //Used for status updates - - public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, - TaskUmbilicalProtocol umbilical, - LocalDirAllocator localDirAllocator, - Reporter reporter, - CompressionCodec codec, - Class combinerClass, - CombineOutputCollector combineCollector, - Counters.Counter spilledRecordsCounter, - Counters.Counter reduceCombineInputCounter, - Counters.Counter shuffledMapsCounter, - Counters.Counter reduceShuffleBytes, - Counters.Counter failedShuffleCounter, - Counters.Counter mergedMapOutputsCounter, - TaskStatus status, - Progress copyPhase, - Progress mergePhase, - Task reduceTask, - MapOutputFile mapOutputFile) { - this.reduceId = reduceId; - this.jobConf = jobConf; - this.umbilical = umbilical; - this.reporter = reporter; + private Progress copyPhase; + private TaskStatus taskStatus; + private Task reduceTask; //Used for status updates + + @Override + public void init(ShuffleConsumerPlugin.Context context) { + this.context = context; + + this.reduceId = context.getReduceId(); + this.jobConf = context.getJobConf(); + this.umbilical = context.getUmbilical(); + this.reporter = context.getReporter(); this.metrics = new ShuffleClientMetrics(reduceId, jobConf); - this.copyPhase = copyPhase; - this.taskStatus = status; - this.reduceTask = reduceTask; + this.copyPhase = context.getCopyPhase(); + this.taskStatus = context.getStatus(); + this.reduceTask = context.getReduceTask(); scheduler = - new ShuffleScheduler(jobConf, status, this, copyPhase, - shuffledMapsCounter, - reduceShuffleBytes, failedShuffleCounter); - merger = new MergeManager(reduceId, jobConf, localFS, - localDirAllocator, reporter, codec, - combinerClass, combineCollector, - spilledRecordsCounter, - reduceCombineInputCounter, - mergedMapOutputsCounter, - this, mergePhase, mapOutputFile); + new ShuffleScheduler(jobConf, taskStatus, this, copyPhase, + context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + merger = new MergeManager(reduceId, jobConf, context.getLocalFS(), + context.getLocalDirAllocator(), reporter, context.getCodec(), + context.getCombinerClass(), context.getCombineCollector(), + context.getSpilledRecordsCounter(), + context.getReduceCombineInputCounter(), + context.getMergedMapOutputsCounter(), + this, context.getMergePhase(), context.getMapOutputFile()); } + @Override public RawKeyValueIterator run() throws IOException, InterruptedException { // Scale the maximum events we fetch per RPC call to mitigate OOM issues // on the ApplicationMaster when a thundering herd of reducers fetch events @@ -171,6 +161,10 @@ public class Shuffle implements ExceptionReporter { return kvIter; } + @Override + public void close(){ + } + public synchronized void reportException(Throwable t) { if (throwable == null) { throwable = t; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 4a093ea4e27..c11a6a8fde9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1162,6 +1162,16 @@ + + mapreduce.job.reduce.shuffle.consumer.plugin.class + org.apache.hadoop.mapreduce.task.reduce.Shuffle + + Name of the class whose instance will be used + to send shuffle requests by reducetasks of this job. + The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin. + + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java new file mode 100644 index 00000000000..e172be54e84 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.hadoop.mapreduce; + +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapreduce.task.reduce.Shuffle; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.ReduceTask; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapred.ShuffleConsumerPlugin; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.Reducer; + +/** + * A JUnit for testing availability and accessibility of shuffle related API. + * It is needed for maintaining comptability with external sub-classes of + * ShuffleConsumerPlugin and AuxiliaryService(s) like ShuffleHandler. + * + * The importance of this test is for preserving API with 3rd party plugins. + */ +public class TestShufflePlugin { + + static class TestShuffleConsumerPlugin implements ShuffleConsumerPlugin { + + @Override + public void init(ShuffleConsumerPlugin.Context context) { + // just verify that Context has kept its public interface + context.getReduceId(); + context.getJobConf(); + context.getLocalFS(); + context.getUmbilical(); + context.getLocalDirAllocator(); + context.getReporter(); + context.getCodec(); + context.getCombinerClass(); + context.getCombineCollector(); + context.getSpilledRecordsCounter(); + context.getReduceCombineInputCounter(); + context.getShuffledMapsCounter(); + context.getReduceShuffleBytes(); + context.getFailedShuffleCounter(); + context.getMergedMapOutputsCounter(); + context.getStatus(); + context.getCopyPhase(); + context.getMergePhase(); + context.getReduceTask(); + context.getMapOutputFile(); + } + + @Override + public void close(){ + } + + @Override + public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{ + return null; + } + } + + + + @Test + /** + * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin + * as if it came from a 3rd party. + */ + public void testPluginAbility() { + + try{ + // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin + JobConf jobConf = new JobConf(); + jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, + TestShufflePlugin.TestShuffleConsumerPlugin.class, + ShuffleConsumerPlugin.class); + + ShuffleConsumerPlugin shuffleConsumerPlugin = null; + Class clazz = + jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); + assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz); + + // load 3rd party plugin through core's factory method + shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf); + assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin); + } + catch (Exception e) { + assertTrue("Threw exception:" + e, false); + } + } + + @Test + /** + * A testing method verifying availability and accessibility of API that is needed + * for sub-classes of ShuffleConsumerPlugin + */ + public void testConsumerApi() { + + JobConf jobConf = new JobConf(); + ShuffleConsumerPlugin shuffleConsumerPlugin = new TestShuffleConsumerPlugin(); + + //mock creation + ReduceTask mockReduceTask = mock(ReduceTask.class); + TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); + Reporter mockReporter = mock(Reporter.class); + FileSystem mockFileSystem = mock(FileSystem.class); + Class combinerClass = jobConf.getCombinerClass(); + @SuppressWarnings("unchecked") // needed for mock with generic + CombineOutputCollector mockCombineOutputCollector = + (CombineOutputCollector) mock(CombineOutputCollector.class); + org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = + mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); + Counter mockCounter = mock(Counter.class); + TaskStatus mockTaskStatus = mock(TaskStatus.class); + Progress mockProgress = mock(Progress.class); + MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); + Task mockTask = mock(Task.class); + + try { + String [] dirs = jobConf.getLocalDirs(); + // verify that these APIs are available through super class handler + ShuffleConsumerPlugin.Context context = + new ShuffleConsumerPlugin.Context(mockTaskAttemptID, jobConf, mockFileSystem, + mockUmbilical, mockLocalDirAllocator, + mockReporter, mockCompressionCodec, + combinerClass, mockCombineOutputCollector, + mockCounter, mockCounter, mockCounter, + mockCounter, mockCounter, mockCounter, + mockTaskStatus, mockProgress, mockProgress, + mockTask, mockMapOutputFile); + shuffleConsumerPlugin.init(context); + shuffleConsumerPlugin.run(); + shuffleConsumerPlugin.close(); + } + catch (Exception e) { + assertTrue("Threw exception:" + e, false); + } + + // verify that these APIs are available for 3rd party plugins + mockReduceTask.getTaskID(); + mockReduceTask.getJobID(); + mockReduceTask.getNumMaps(); + mockReduceTask.getPartition(); + mockReporter.progress(); + } + + @Test + /** + * A testing method verifying availability and accessibility of API needed for + * AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins) + */ + public void testProviderApi() { + + ApplicationId mockApplicationId = mock(ApplicationId.class); + mockApplicationId.setClusterTimestamp(new Long(10)); + mockApplicationId.setId(mock(JobID.class).getId()); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + JobConf mockJobConf = mock(JobConf.class); + try { + mockLocalDirAllocator.getLocalPathToRead("", mockJobConf); + } + catch (Exception e) { + assertTrue("Threw exception:" + e, false); + } + } +}