svn merge -c 1536711 FIXES: MAPREDUCE-5596. Allow configuring the number of threads used to serve shuffle connections. Contributed by Sandy Ryza
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1536720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
67365f3172
commit
72d881132f
|
@ -75,6 +75,9 @@ Release 2.2.1 - UNRELEASED
|
||||||
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
|
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
|
||||||
out text files without separators (Sandy Ryza)
|
out text files without separators (Sandy Ryza)
|
||||||
|
|
||||||
|
MAPREDUCE-5596. Allow configuring the number of threads used to serve
|
||||||
|
shuffle connections (Sandy Ryza via jlowe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
|
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
|
||||||
|
|
|
@ -560,6 +560,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.shuffle.max.threads</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>Max allowed threads for serving shuffle connections. Set to zero
|
||||||
|
to indicate the default of 2 times the number of available
|
||||||
|
processors (as reported by Runtime.availableProcessors()). Netty is used to
|
||||||
|
serve requests, so a thread is not needed for each connection.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.reduce.markreset.buffer.percent</name>
|
<name>mapreduce.reduce.markreset.buffer.percent</name>
|
||||||
<value>0.0</value>
|
<value>0.0</value>
|
||||||
|
|
|
@ -163,6 +163,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
|
|
||||||
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
|
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
|
||||||
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
|
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
|
||||||
|
|
||||||
|
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
|
||||||
|
// 0 implies Netty default of 2 * number of available processors
|
||||||
|
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
|
||||||
|
|
||||||
@Metrics(about="Shuffle output metrics", context="mapred")
|
@Metrics(about="Shuffle output metrics", context="mapred")
|
||||||
static class ShuffleMetrics implements ChannelFutureListener {
|
static class ShuffleMetrics implements ChannelFutureListener {
|
||||||
|
@ -282,6 +286,11 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
|
|
||||||
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
|
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
|
||||||
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
|
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
|
||||||
|
int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
|
||||||
|
DEFAULT_MAX_SHUFFLE_THREADS);
|
||||||
|
if (maxShuffleThreads == 0) {
|
||||||
|
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
|
||||||
|
}
|
||||||
|
|
||||||
ThreadFactory bossFactory = new ThreadFactoryBuilder()
|
ThreadFactory bossFactory = new ThreadFactoryBuilder()
|
||||||
.setNameFormat("ShuffleHandler Netty Boss #%d")
|
.setNameFormat("ShuffleHandler Netty Boss #%d")
|
||||||
|
@ -292,7 +301,8 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
|
|
||||||
selector = new NioServerSocketChannelFactory(
|
selector = new NioServerSocketChannelFactory(
|
||||||
Executors.newCachedThreadPool(bossFactory),
|
Executors.newCachedThreadPool(bossFactory),
|
||||||
Executors.newCachedThreadPool(workerFactory));
|
Executors.newCachedThreadPool(workerFactory),
|
||||||
|
maxShuffleThreads);
|
||||||
super.serviceInit(new Configuration(conf));
|
super.serviceInit(new Configuration(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue