From 9d38520c8e42530a817a7f69c9aa73a9ad40639c Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Sat, 14 Mar 2015 16:44:02 +0900 Subject: [PATCH] MAPREDUCE-6265. Make ContainerLauncherImpl.INITIAL_POOL_SIZE configurable to better control to launch/kill containers. Contributed by Zhihai Xu --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../app/launcher/ContainerLauncherImpl.java | 14 +++++++++---- .../app/launcher/TestContainerLauncher.java | 21 ++++++++++++++----- .../apache/hadoop/mapreduce/MRJobConfig.java | 8 +++++++ .../src/main/resources/mapred-default.xml | 8 +++++++ 5 files changed, 45 insertions(+), 9 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0bbe85c7d5a..ab6eef5b902 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -340,6 +340,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6263. Configurable timeout between YARNRunner terminate the application and forcefully kill. (Eric Payne via junping_du) + MAPREDUCE-6265. Make ContainerLauncherImpl.INITIAL_POOL_SIZE configurable + to better control to launch/kill containers. (Zhihai Xu via ozawa) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 666f757b540..9c1125d4ec2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -70,7 +70,7 @@ public class ContainerLauncherImpl extends AbstractService implements new ConcurrentHashMap(); private final AppContext context; protected ThreadPoolExecutor launcherPool; - protected static final int INITIAL_POOL_SIZE = 10; + protected int initialPoolSize; private int limitOnPoolSize; private Thread eventHandlingThread; protected BlockingQueue eventQueue = @@ -246,6 +246,12 @@ public class ContainerLauncherImpl extends AbstractService implements MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); + + this.initialPoolSize = conf.getInt( + MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE, + MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE); + LOG.info("The thread pool initial size is " + this.initialPoolSize); + super.serviceInit(conf); cmProxy = new ContainerManagementProtocolProxy(conf); } @@ -256,7 +262,7 @@ public class ContainerLauncherImpl extends AbstractService implements "ContainerLauncher #%d").setDaemon(true).build(); // Start with a default core-pool size of 10 and change it dynamically. - launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, + launcherPool = new ThreadPoolExecutor(initialPoolSize, Integer.MAX_VALUE, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); @@ -289,11 +295,11 @@ public class ContainerLauncherImpl extends AbstractService implements int idealPoolSize = Math.min(limitOnPoolSize, numNodes); if (poolSize < idealPoolSize) { - // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the + // Bump up the pool size to idealPoolSize+initialPoolSize, the // later is just a buffer so we are not always increasing the // pool-size int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize - + INITIAL_POOL_SIZE); + + initialPoolSize); LOG.info("Setting ContainerLauncher pool size to " + newPoolSize + " as number-of-nodes to talk to is " + numNodes); launcherPool.setCorePoolSize(newPoolSize); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dc1d72f89f0..41ee65dcf97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -90,7 +90,7 @@ public class TestContainerLauncher { static final Log LOG = LogFactory.getLog(TestContainerLauncher.class); - @Test (timeout = 5000) + @Test (timeout = 10000) public void testPoolSize() throws InterruptedException { ApplicationId appId = ApplicationId.newInstance(12345, 67); @@ -108,12 +108,14 @@ public class TestContainerLauncher { ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); // No events yet + Assert.assertEquals(containerLauncher.initialPoolSize, + MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE); Assert.assertEquals(0, threadPool.getPoolSize()); - Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE, + Assert.assertEquals(containerLauncher.initialPoolSize, threadPool.getCorePoolSize()); Assert.assertNull(containerLauncher.foundErrors); - containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize; for (int i = 0; i < 10; i++) { ContainerId containerId = ContainerId.newContainerId(appAttemptId, i); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i); @@ -152,7 +154,7 @@ public class TestContainerLauncher { // Different hosts, there should be an increase in core-thread-pool size to // 21(11hosts+10buffer) // Core pool size should be 21 but the live pool size should be only 11. - containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = 11 + containerLauncher.initialPoolSize; containerLauncher.finishEventHandling = false; ContainerId containerId = ContainerId.newContainerId(appAttemptId, 21); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); @@ -164,6 +166,15 @@ public class TestContainerLauncher { Assert.assertNull(containerLauncher.foundErrors); containerLauncher.stop(); + + // change configuration MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE + // and verify initialPoolSize value. + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE, + 20); + containerLauncher = new CustomContainerLauncher(context); + containerLauncher.init(conf); + Assert.assertEquals(containerLauncher.initialPoolSize, 20); } @Test(timeout = 5000) @@ -187,7 +198,7 @@ public class TestContainerLauncher { ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); // 10 different hosts - containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize; for (int i = 0; i < 10; i++) { containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 9f671cd9c3b..3aa304a6677 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -504,6 +504,14 @@ public interface MRJobConfig { public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = 500; + /** + * The initial size of thread pool to launch containers in the app master + */ + public static final String MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE = + MR_AM_PREFIX+"containerlauncher.threadpool-initial-size"; + public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE = + 10; + /** Number of threads to handle job client RPC requests.*/ public static final String MR_AM_JOB_CLIENT_THREAD_COUNT = MR_AM_PREFIX + "job.client.thread-count"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index d7bec9c1852..820c1ac8d80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1694,4 +1694,12 @@ calculated as (heapSize / mapreduce.heap.memory-mb.ratio). + + + yarn.app.mapreduce.am.containerlauncher.threadpool-initial-size + 10 + The initial size of thread pool to launch containers in the + app master. + +