diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9aa10374471..732719232bd 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
YARN-4648. Move preemption related tests from TestFairScheduler to
TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
+ YARN-4697. NM aggregation thread pool is not bound by
+ limits (haibochen via rkanter)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index f0c7e6d9437..69e5ba51f3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -759,6 +759,11 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
+ /** The number of threads to handle log aggregation in node manager. */
+ public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE =
+ NM_PREFIX + "logaggregation.threadpool-size-max";
+ public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100;
+
public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
NM_PREFIX + "resourcemanager.minimum.version";
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index cd4074a8d1e..72c89c93458 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1226,6 +1226,14 @@
1.0
+
+
+ Thread pool size for LogAggregationService in Node Manager.
+
+ yarn.nodemanager.logaggregation.threadpool-size-max
+ 100
+
+
Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 64115355ac7..2d6b9008ebb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -102,7 +102,8 @@ public class LogAggregationService extends AbstractService implements
private final ConcurrentMap appLogAggregators;
- private final ExecutorService threadPool;
+ @VisibleForTesting
+ ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@@ -113,10 +114,6 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap();
- this.threadPool = HadoopExecutors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setNameFormat("LogAggregationService #%d")
- .build());
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -126,7 +123,11 @@ public class LogAggregationService extends AbstractService implements
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-
+ int threadPoolSize = getAggregatorThreadPoolSize(conf);
+ this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
+ new ThreadFactoryBuilder()
+ .setNameFormat("LogAggregationService #%d")
+ .build());
super.serviceInit(conf);
}
@@ -487,4 +488,26 @@ public class LogAggregationService extends AbstractService implements
public NodeId getNodeId() {
return this.nodeId;
}
+
+
+ private int getAggregatorThreadPoolSize(Configuration conf) {
+ int threadPoolSize;
+ try {
+ threadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+ } catch (NumberFormatException ex) {
+ LOG.warn("Invalid thread pool size. Setting it to the default value " +
+ "in YarnConfiguration");
+ threadPoolSize = YarnConfiguration.
+ DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+ }
+ if(threadPoolSize <= 0) {
+ LOG.warn("Invalid thread pool size. Setting it to the default value " +
+ "in YarnConfiguration");
+ threadPoolSize = YarnConfiguration.
+ DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+ }
+ return threadPoolSize;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 101fef093ee..87c3f27b445 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -55,6 +55,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
@@ -1040,6 +1046,143 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
return appAcls;
}
+ @Test (timeout = 30000)
+ public void testFixedSizeThreadPool() throws Exception {
+ // store configured thread pool size temporarily for restoration
+ int initThreadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+ int threadPoolSize = 3;
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ threadPoolSize);
+
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+ when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ExecutorService executorService = logAggregationService.threadPool;
+
+ // used to block threads in the thread pool because main thread always
+ // acquires the write lock first.
+ final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ final Lock rLock = rwLock.readLock();
+ final Lock wLock = rwLock.writeLock();
+
+ try {
+ wLock.lock();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // threads in the thread pool running this will be blocked
+ rLock.tryLock(35000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ rLock.unlock();
+ }
+ }
+ };
+
+ // submit $(threadPoolSize + 1) runnables to the thread pool. If the thread
+ // pool size is set properly, only $(threadPoolSize) threads will be
+ // created in the thread pool, each of which is blocked on the read lock.
+ for(int i = 0; i < threadPoolSize + 1; i++) {
+ executorService.submit(runnable);
+ }
+
+ // count the number of current running LogAggregationService threads
+ int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount();
+ assertEquals(threadPoolSize, runningThread);
+ }
+ finally {
+ wLock.unlock();
+ }
+
+ logAggregationService.stop();
+ logAggregationService.close();
+
+ // restore the original configurations to avoid side effects
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ initThreadPoolSize);
+ }
+
+ @Test
+ public void testInvalidThreadPoolSizeNaN() throws IOException {
+ testInvalidThreadPoolSizeValue("NaN");
+ }
+
+ @Test
+ public void testInvalidThreadPoolSizeNegative() throws IOException {
+ testInvalidThreadPoolSizeValue("-100");
+ }
+
+ @Test
+ public void testInvalidThreadPoolSizeXLarge() throws IOException {
+ testInvalidThreadPoolSizeValue("11111111111");
+ }
+
+ private void testInvalidThreadPoolSizeValue(final String threadPoolSize)
+ throws IOException {
+ Supplier isInputInvalid = new Supplier() {
+ @Override
+ public Boolean get() {
+ try {
+ int value = Integer.parseInt(threadPoolSize);
+ return value <= 0;
+ } catch (NumberFormatException ex) {
+ return true;
+ }
+ }
+ };
+
+ assertTrue("The thread pool size must be invalid to use with this " +
+ "method", isInputInvalid.get());
+
+
+ // store configured thread pool size temporarily for restoration
+ int initThreadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+ conf.set(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ threadPoolSize);
+
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+ when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ThreadPoolExecutor executorService = (ThreadPoolExecutor)
+ logAggregationService.threadPool;
+ assertEquals("The thread pool size should be set to the value of YARN" +
+ ".DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE because the configured "
+ + " thread pool size is " + "invalid.",
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ executorService.getMaximumPoolSize());
+
+ logAggregationService.stop();
+ logAggregationService.close();
+
+ // retore original configuration to aviod side effects
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ initThreadPoolSize);
+ }
+
@Test(timeout=20000)
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);