YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter)

(cherry picked from commit 954dd57043)
This commit is contained in:
Robert Kanter 2016-02-24 15:00:24 -08:00
parent 92e49cdd04
commit c2098d2470
5 changed files with 188 additions and 6 deletions

View File

@ -82,6 +82,9 @@ Release 2.9.0 - UNRELEASED
YARN-4648. Move preemption related tests from TestFairScheduler to YARN-4648. Move preemption related tests from TestFairScheduler to
TestFairSchedulerPreemption. (Kai Sasaki via ozawa) TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
YARN-4697. NM aggregation thread pool is not bound by
limits (haibochen via rkanter)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -759,6 +759,11 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
/** The number of threads to handle log aggregation in node manager. */
public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE =
NM_PREFIX + "logaggregation.threadpool-size-max";
public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100;
public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION = public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
NM_PREFIX + "resourcemanager.minimum.version"; NM_PREFIX + "resourcemanager.minimum.version";
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";

View File

@ -1226,6 +1226,14 @@
<value>1.0</value> <value>1.0</value>
</property> </property>
<property>
<description>
Thread pool size for LogAggregationService in Node Manager.
</description>
<name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
<value>100</value>
</property>
<property> <property>
<description>Percentage of CPU that can be allocated <description>Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of for containers. This setting allows users to limit the amount of

View File

@ -102,7 +102,8 @@ public class LogAggregationService extends AbstractService implements
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators; private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private final ExecutorService threadPool; @VisibleForTesting
ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context, public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) { DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@ -113,10 +114,6 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler; this.dirsHandler = dirsHandler;
this.appLogAggregators = this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>(); new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
} }
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
@ -126,7 +123,11 @@ public class LogAggregationService extends AbstractService implements
this.remoteRootLogDirSuffix = this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
int threadPoolSize = getAggregatorThreadPoolSize(conf);
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -487,4 +488,26 @@ public class LogAggregationService extends AbstractService implements
public NodeId getNodeId() { public NodeId getNodeId() {
return this.nodeId; return this.nodeId;
} }
private int getAggregatorThreadPoolSize(Configuration conf) {
int threadPoolSize;
try {
threadPoolSize = conf.getInt(YarnConfiguration
.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
} catch (NumberFormatException ex) {
LOG.warn("Invalid thread pool size. Setting it to the default value " +
"in YarnConfiguration");
threadPoolSize = YarnConfiguration.
DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
}
if(threadPoolSize <= 0) {
LOG.warn("Invalid thread pool size. Setting it to the default value " +
"in YarnConfiguration");
threadPoolSize = YarnConfiguration.
DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
}
return threadPoolSize;
}
} }

View File

@ -55,6 +55,12 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -1040,6 +1046,143 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
return appAcls; return appAcls;
} }
@Test (timeout = 30000)
public void testFixedSizeThreadPool() throws Exception {
// store configured thread pool size temporarily for restoration
int initThreadPoolSize = conf.getInt(YarnConfiguration
.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
int threadPoolSize = 3;
conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
threadPoolSize);
DeletionService delSrvc = mock(DeletionService.class);
LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ExecutorService executorService = logAggregationService.threadPool;
// used to block threads in the thread pool because main thread always
// acquires the write lock first.
final ReadWriteLock rwLock = new ReentrantReadWriteLock();
final Lock rLock = rwLock.readLock();
final Lock wLock = rwLock.writeLock();
try {
wLock.lock();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
// threads in the thread pool running this will be blocked
rLock.tryLock(35000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
}
};
// submit $(threadPoolSize + 1) runnables to the thread pool. If the thread
// pool size is set properly, only $(threadPoolSize) threads will be
// created in the thread pool, each of which is blocked on the read lock.
for(int i = 0; i < threadPoolSize + 1; i++) {
executorService.submit(runnable);
}
// count the number of current running LogAggregationService threads
int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount();
assertEquals(threadPoolSize, runningThread);
}
finally {
wLock.unlock();
}
logAggregationService.stop();
logAggregationService.close();
// restore the original configurations to avoid side effects
conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
initThreadPoolSize);
}
@Test
public void testInvalidThreadPoolSizeNaN() throws IOException {
testInvalidThreadPoolSizeValue("NaN");
}
@Test
public void testInvalidThreadPoolSizeNegative() throws IOException {
testInvalidThreadPoolSizeValue("-100");
}
@Test
public void testInvalidThreadPoolSizeXLarge() throws IOException {
testInvalidThreadPoolSizeValue("11111111111");
}
private void testInvalidThreadPoolSizeValue(final String threadPoolSize)
throws IOException {
Supplier<Boolean> isInputInvalid = new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
int value = Integer.parseInt(threadPoolSize);
return value <= 0;
} catch (NumberFormatException ex) {
return true;
}
}
};
assertTrue("The thread pool size must be invalid to use with this " +
"method", isInputInvalid.get());
// store configured thread pool size temporarily for restoration
int initThreadPoolSize = conf.getInt(YarnConfiguration
.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
conf.set(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
threadPoolSize);
DeletionService delSrvc = mock(DeletionService.class);
LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ThreadPoolExecutor executorService = (ThreadPoolExecutor)
logAggregationService.threadPool;
assertEquals("The thread pool size should be set to the value of YARN" +
".DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE because the configured "
+ " thread pool size is " + "invalid.",
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
executorService.getMaximumPoolSize());
logAggregationService.stop();
logAggregationService.close();
// retore original configuration to aviod side effects
conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
initThreadPoolSize);
}
@Test(timeout=20000) @Test(timeout=20000)
public void testStopAfterError() throws Exception { public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class); DeletionService delSrvc = mock(DeletionService.class);