HADOOP-12321. Make JvmPauseMonitor an AbstractService. (Sunil G via Stevel) [includes HDFS-8947 MAPREDUCE-6462 and YARN-4072]

This commit is contained in:
Steve Loughran 2015-12-06 17:42:56 +00:00
parent 78a07e99dd
commit af14458de7
15 changed files with 123 additions and 59 deletions

View File

@ -8,6 +8,9 @@ Release 2.9.0 - UNRELEASED
IMPROVEMENTS
HADOOP-12321. Make JvmPauseMonitor an AbstractService.
(Sunil G via Stevel)
OPTIMIZATIONS
BUG FIXES

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -43,7 +44,7 @@ import com.google.common.collect.Sets;
* detected, the thread logs a message.
*/
@InterfaceAudience.Private
public class JvmPauseMonitor {
public class JvmPauseMonitor extends AbstractService {
private static final Log LOG = LogFactory.getLog(
JvmPauseMonitor.class);
@ -51,13 +52,13 @@ public class JvmPauseMonitor {
private static final long SLEEP_INTERVAL_MS = 500;
/** log WARN if we detect a pause longer than this threshold */
private final long warnThresholdMs;
private long warnThresholdMs;
private static final String WARN_THRESHOLD_KEY =
"jvm.pause.warn-threshold.ms";
private static final long WARN_THRESHOLD_DEFAULT = 10000;
/** log INFO if we detect a pause longer than this threshold */
private final long infoThresholdMs;
private long infoThresholdMs;
private static final String INFO_THRESHOLD_KEY =
"jvm.pause.info-threshold.ms";
private static final long INFO_THRESHOLD_DEFAULT = 1000;
@ -69,25 +70,28 @@ public class JvmPauseMonitor {
private Thread monitorThread;
private volatile boolean shouldRun = true;
public JvmPauseMonitor(Configuration conf) {
public JvmPauseMonitor() {
super(JvmPauseMonitor.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
super.serviceInit(conf);
}
public void start() {
Preconditions.checkState(monitorThread == null,
"Already started");
@Override
protected void serviceStart() throws Exception {
monitorThread = new Daemon(new Monitor());
if (shouldRun) {
monitorThread.start();
} else {
LOG.warn("stop() was called before start() completed");
}
super.serviceStart();
}
public void stop() {
@Override
protected void serviceStop() throws Exception {
shouldRun = false;
if (isStarted()) {
if (monitorThread != null) {
monitorThread.interrupt();
try {
monitorThread.join();
@ -95,6 +99,7 @@ public class JvmPauseMonitor {
Thread.currentThread().interrupt();
}
}
super.serviceStop();
}
public boolean isStarted() {
@ -212,8 +217,11 @@ public class JvmPauseMonitor {
* with a 1GB heap will very quickly go into "GC hell" and result in
* log messages about the GC pauses.
*/
@SuppressWarnings("resource")
public static void main(String []args) throws Exception {
new JvmPauseMonitor(new Configuration()).start();
JvmPauseMonitor monitor = new JvmPauseMonitor();
monitor.init(new Configuration());
monitor.start();
List<String> list = Lists.newArrayList();
int i = 0;
while (true) {

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.metrics2.source;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.mockito.Mockito.*;
import static org.apache.hadoop.test.MetricsAsserts.*;
@ -26,6 +30,9 @@ import static org.apache.hadoop.test.MetricsAsserts.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.JvmPauseMonitor;
import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*;
@ -33,8 +40,23 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.*;
public class TestJvmMetrics {
@Test public void testPresence() {
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(new Configuration());
@Rule
public Timeout timeout = new Timeout(30000);
private JvmPauseMonitor pauseMonitor;
/**
* Robust shutdown of the pause monitor if it hasn't been stopped already.
*/
@After
public void teardown() {
ServiceOperations.stop(pauseMonitor);
}
@Test
public void testPresence() {
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(new Configuration());
pauseMonitor.start();
JvmMetrics jvmMetrics = new JvmMetrics("test", "test");
jvmMetrics.setPauseMonitor(pauseMonitor);
MetricsRecordBuilder rb = getMetrics(jvmMetrics);
@ -54,4 +76,48 @@ public class TestJvmMetrics {
verify(rb).addCounter(eq(info), anyLong());
}
}
@Test
public void testDoubleStop() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(new Configuration());
pauseMonitor.start();
pauseMonitor.stop();
pauseMonitor.stop();
}
@Test
public void testDoubleStart() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(new Configuration());
pauseMonitor.start();
pauseMonitor.start();
pauseMonitor.stop();
}
@Test
public void testStopBeforeStart() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
try {
pauseMonitor.init(new Configuration());
pauseMonitor.stop();
pauseMonitor.start();
Assert.fail("Expected an exception, got " + pauseMonitor);
} catch (ServiceStateException e) {
GenericTestUtils.assertExceptionContains("cannot enter state", e);
}
}
@Test
public void testStopBeforeInit() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
try {
pauseMonitor.stop();
pauseMonitor.init(new Configuration());
Assert.fail("Expected an exception, got " + pauseMonitor);
} catch (ServiceStateException e) {
GenericTestUtils.assertExceptionContains("cannot enter state", e);
}
}
}

View File

@ -243,7 +243,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
@Override
public void startDaemons() {
if (pauseMonitor == null) {
pauseMonitor = new JvmPauseMonitor(config);
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(config);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
}

View File

@ -17,6 +17,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for
reusability. (Xiaobing Zhou via Arpit Agarwal)
HDFS-8947. NameNode, DataNode and NFS gateway to support JvmPauseMonitor as
a service. (Sunil G via Stevel)
OPTIMIZATIONS
BUG FIXES

View File

@ -1249,7 +1249,8 @@ public class DataNode extends ReconfigurableBase
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server.

View File

@ -691,7 +691,8 @@ public class NameNode implements NameNodeStatusMXBean {
httpServer.setFSImage(getFSImage());
}
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

View File

@ -145,7 +145,8 @@ public class JobHistoryServer extends CompositeService {
DefaultMetricsSystem.initialize("JobHistoryServer");
JvmMetrics jm = JvmMetrics.initSingleton("JobHistoryServer", null);
pauseMonitor = new JvmPauseMonitor(getConfig());
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor);
super.serviceInit(config);
@ -198,16 +199,12 @@ public class JobHistoryServer extends CompositeService {
@Override
protected void serviceStart() throws Exception {
pauseMonitor.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
DefaultMetricsSystem.shutdown();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
super.serviceStop();
}

View File

@ -77,7 +77,6 @@ public class TestJobHistoryServer {
Configuration config = new Configuration();
historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState());
assertEquals(6, historyServer.getServices().size());
HistoryClientService historyService = historyServer.getClientService();
assertNotNull(historyServer.getClientService());
assertEquals(STATE.INITED, historyService.getServiceState());

View File

@ -8,6 +8,10 @@ Release 2.9.0 - UNRELEASED
IMPROVEMENTS
YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and
ResourceManager to support JvmPauseMonitor as a service.
(Sunil G via Stevel)
OPTIMIZATIONS
BUG FIXES

View File

@ -103,7 +103,8 @@ public class ApplicationHistoryServer extends CompositeService {
DefaultMetricsSystem.initialize("ApplicationHistoryServer");
JvmMetrics jm = JvmMetrics.initSingleton("ApplicationHistoryServer", null);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor);
super.serviceInit(conf);
}
@ -116,9 +117,6 @@ public class ApplicationHistoryServer extends CompositeService {
throw new YarnRuntimeException("Failed to login", ie);
}
if (pauseMonitor != null) {
pauseMonitor.start();
}
super.serviceStart();
startWebApp();
}
@ -128,9 +126,6 @@ public class ApplicationHistoryServer extends CompositeService {
if (webApp != null) {
webApp.stop();
}
if (pauseMonitor != null) {
pauseMonitor.stop();
}
DefaultMetricsSystem.shutdown();
super.serviceStop();
}

View File

@ -73,7 +73,6 @@ public class TestApplicationHistoryServer {
historyServer = new ApplicationHistoryServer();
historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState());
assertEquals(5, historyServer.getServices().size());
ApplicationHistoryClientService historyService =
historyServer.getClientService();
assertNotNull(historyServer.getClientService());

View File

@ -344,7 +344,8 @@ public class NodeManager extends CompositeService
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
DefaultMetricsSystem.initialize("NodeManager");
@ -364,7 +365,6 @@ public class NodeManager extends CompositeService
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
pauseMonitor.start();
super.serviceStart();
}
@ -376,9 +376,6 @@ public class NodeManager extends CompositeService
try {
super.serviceStop();
DefaultMetricsSystem.shutdown();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
} finally {
// YARN-3641: NM's services stop get failed shouldn't block the
// release of NMLevelDBStore.

View File

@ -518,7 +518,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics jm = JvmMetrics.initSingleton("ResourceManager", null);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor);
// Initialize the Reservation system
@ -574,8 +575,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
// need events to move to further states.
rmStore.start();
pauseMonitor.start();
if(recoveryEnabled) {
try {
LOG.info("Recovery started");
@ -602,10 +601,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
super.serviceStop();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
DefaultMetricsSystem.shutdown();
if (rmContext != null) {
RMStateStore store = rmContext.getStateStore();

View File

@ -67,7 +67,8 @@ public class WebAppProxyServer extends CompositeService {
DefaultMetricsSystem.initialize("WebAppProxyServer");
JvmMetrics jm = JvmMetrics.initSingleton("WebAppProxyServer", null);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor);
super.serviceInit(config);
@ -75,9 +76,6 @@ public class WebAppProxyServer extends CompositeService {
@Override
protected void serviceStart() throws Exception {
if (pauseMonitor != null) {
pauseMonitor.start();
}
super.serviceStart();
}
@ -85,9 +83,6 @@ public class WebAppProxyServer extends CompositeService {
protected void serviceStop() throws Exception {
super.serviceStop();
DefaultMetricsSystem.shutdown();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
}
/**