merge MAPREDUCE-5329. Allow MR applications to use additional AuxServices, which are compatible with the default MapReduce shuffle. Contributed by Avner BenHanoch.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1531742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d890e365c8
commit
5ff9f8bed2
@ -61,6 +61,10 @@ Release 2.2.1 - UNRELEASED
|
|||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-5329. Allow MR applications to use additional AuxServices,
|
||||||
|
which are compatible with the default MapReduce shuffle.
|
||||||
|
(Avner BenHanoch via sseth)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
@ -722,6 +723,32 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
|
|||||||
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||||
ShuffleHandler.serializeServiceData(shuffleToken));
|
ShuffleHandler.serializeServiceData(shuffleToken));
|
||||||
|
|
||||||
|
// add external shuffle-providers - if any
|
||||||
|
Collection<String> shuffleProviders = conf.getStringCollection(
|
||||||
|
MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES);
|
||||||
|
if (! shuffleProviders.isEmpty()) {
|
||||||
|
Collection<String> auxNames = conf.getStringCollection(
|
||||||
|
YarnConfiguration.NM_AUX_SERVICES);
|
||||||
|
|
||||||
|
for (final String shuffleProvider : shuffleProviders) {
|
||||||
|
if (shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) {
|
||||||
|
continue; // skip built-in shuffle-provider that was already inserted with shuffle secret key
|
||||||
|
}
|
||||||
|
if (auxNames.contains(shuffleProvider)) {
|
||||||
|
LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + " to serviceData");
|
||||||
|
// This only serves for INIT_APP notifications
|
||||||
|
// The shuffle service needs to be able to work with the host:port information provided by the AM
|
||||||
|
// (i.e. shuffle services which require custom location / other configuration are not supported)
|
||||||
|
serviceData.put(shuffleProvider, ByteBuffer.allocate(0));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new YarnRuntimeException("ShuffleProvider Service: " + shuffleProvider +
|
||||||
|
" was NOT found in the list of aux-services that are available in this NM." +
|
||||||
|
" You may need to specify this ShuffleProvider as an aux-service in your yarn-site.xml");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Apps.addToEnvironment(
|
Apps.addToEnvironment(
|
||||||
environment,
|
environment,
|
||||||
Environment.CLASSPATH.name(),
|
Environment.CLASSPATH.name(),
|
||||||
|
@ -0,0 +1,159 @@
|
|||||||
|
/**
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
||||||
|
import org.apache.hadoop.mapred.WrappedJvmID;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
public class TestShuffleProvider {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShuffleProviders() throws Exception {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||||
|
Path jobFile = mock(Path.class);
|
||||||
|
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||||
|
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
|
||||||
|
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||||
|
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||||
|
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||||
|
|
||||||
|
jobConf.set(YarnConfiguration.NM_AUX_SERVICES,
|
||||||
|
TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + "," +
|
||||||
|
TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID);
|
||||||
|
|
||||||
|
String serviceName = TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID;
|
||||||
|
String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName);
|
||||||
|
jobConf.set(serviceStr, TestShuffleHandler1.class.getName());
|
||||||
|
|
||||||
|
serviceName = TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID;
|
||||||
|
serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName);
|
||||||
|
jobConf.set(serviceStr, TestShuffleHandler2.class.getName());
|
||||||
|
|
||||||
|
jobConf.set(MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES,
|
||||||
|
TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID
|
||||||
|
+ "," + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID);
|
||||||
|
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
|
||||||
|
("tokenid").getBytes(), ("tokenpw").getBytes(),
|
||||||
|
new Text("tokenkind"), new Text("tokenservice"));
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
||||||
|
mock(TaskSplitMetaInfo.class), jobConf, taListener,
|
||||||
|
jobToken, credentials,
|
||||||
|
new SystemClock(), null);
|
||||||
|
|
||||||
|
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
|
||||||
|
|
||||||
|
ContainerLaunchContext launchCtx =
|
||||||
|
TaskAttemptImpl.createContainerLaunchContext(null,
|
||||||
|
jobConf, jobToken, taImpl.createRemoteTask(),
|
||||||
|
TypeConverter.fromYarn(jobId),
|
||||||
|
mock(WrappedJvmID.class), taListener,
|
||||||
|
credentials);
|
||||||
|
|
||||||
|
Map<String, ByteBuffer> serviceDataMap = launchCtx.getServiceData();
|
||||||
|
Assert.assertNotNull("TestShuffleHandler1 is missing", serviceDataMap.get(TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID));
|
||||||
|
Assert.assertNotNull("TestShuffleHandler2 is missing", serviceDataMap.get(TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID));
|
||||||
|
Assert.assertTrue("mismatch number of services in map", serviceDataMap.size() == 3); // 2 that we entered + 1 for the built-in shuffle-provider
|
||||||
|
}
|
||||||
|
|
||||||
|
static public class StubbedFS extends RawLocalFileSystem {
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
return new FileStatus(1, false, 1, 1, 1, f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static public class TestShuffleHandler1 extends AuxiliaryService {
|
||||||
|
public static final String MAPREDUCE_TEST_SHUFFLE_SERVICEID = "test_shuffle1";
|
||||||
|
public TestShuffleHandler1() {
|
||||||
|
super("testshuffle1");
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void initializeApplication(ApplicationInitializationContext context) {
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void stopApplication(ApplicationTerminationContext context) {
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public synchronized ByteBuffer getMetaData() {
|
||||||
|
return ByteBuffer.allocate(0); // Don't 'return null' because of YARN-1256
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static public class TestShuffleHandler2 extends AuxiliaryService {
|
||||||
|
public static final String MAPREDUCE_TEST_SHUFFLE_SERVICEID = "test_shuffle2";
|
||||||
|
public TestShuffleHandler2() {
|
||||||
|
super("testshuffle2");
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void initializeApplication(ApplicationInitializationContext context) {
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void stopApplication(ApplicationTerminationContext context) {
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public synchronized ByteBuffer getMetaData() {
|
||||||
|
return ByteBuffer.allocate(0); // Don't 'return null' because of YARN-1256
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -133,6 +133,13 @@ public interface MRJobConfig {
|
|||||||
|
|
||||||
public static final String MAPREDUCE_JOB_CLASSLOADER = "mapreduce.job.classloader";
|
public static final String MAPREDUCE_JOB_CLASSLOADER = "mapreduce.job.classloader";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A comma-separated list of services that function as ShuffleProvider aux-services
|
||||||
|
* (in addition to the built-in ShuffleHandler).
|
||||||
|
* These services can serve shuffle requests from reducetasks.
|
||||||
|
*/
|
||||||
|
public static final String MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES = "mapreduce.job.shuffle.provider.services";
|
||||||
|
|
||||||
public static final String MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES = "mapreduce.job.classloader.system.classes";
|
public static final String MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES = "mapreduce.job.classloader.system.classes";
|
||||||
|
|
||||||
public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
|
public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
|
||||||
|
Loading…
x
Reference in New Issue
Block a user