From e0c99b80d47aaf65ee51c601cb8124ce5df79b2f Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Tue, 29 Oct 2013 13:41:50 +0000 Subject: [PATCH] MAPREDUCE-5596. Allow configuring the number of threads used to serve shuffle connections. Contributed by Sandy Ryza git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1536711 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../src/main/resources/mapred-default.xml | 10 ++++++++++ .../org/apache/hadoop/mapred/ShuffleHandler.java | 12 +++++++++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d81fc50246c..b9db11cf173 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -212,6 +212,9 @@ Release 2.2.1 - UNRELEASED MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write out text files without separators (Sandy Ryza) + MAPREDUCE-5596. Allow configuring the number of threads used to serve + shuffle connections (Sandy Ryza via jlowe) + OPTIMIZATIONS MAPREDUCE-4680. Job history cleaner should only check timestamps of files in diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 598d106ce95..29facecb6b1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -304,6 +304,16 @@ + + mapreduce.shuffle.max.threads + 0 + Max allowed threads for serving shuffle connections. Set to zero + to indicate the default of 2 times the number of available + processors (as reported by Runtime.availableProcessors()). Netty is used to + serve requests, so a thread is not needed for each connection. + + + mapreduce.reduce.markreset.buffer.percent 0.0 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 82fd59e5516..9f377e23ac0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -163,6 +163,10 @@ public class ShuffleHandler extends AuxiliaryService { public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections"; public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit + + public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads"; + // 0 implies Netty default of 2 * number of available processors + public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @@ -282,6 +286,11 @@ public class ShuffleHandler extends AuxiliaryService { maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, DEFAULT_MAX_SHUFFLE_CONNECTIONS); + int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS, + DEFAULT_MAX_SHUFFLE_THREADS); + if (maxShuffleThreads == 0) { + maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors(); + } ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") @@ -292,7 +301,8 @@ public class ShuffleHandler extends AuxiliaryService { selector = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + Executors.newCachedThreadPool(workerFactory), + maxShuffleThreads); super.serviceInit(new Configuration(conf)); }