HADOOP-12321. Make JvmPauseMonitor an AbstractService. (Sunil G via Stevel) [includes HDFS-8947 MAPREDUCE-6462 and YARN-4072]

This commit is contained in:
Steve Loughran 2015-12-06 17:42:56 +00:00
parent 742632e346
commit 65f395226b
15 changed files with 123 additions and 59 deletions

View File

@ -635,6 +635,9 @@ Release 2.9.0 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
HADOOP-12321. Make JvmPauseMonitor an AbstractService.
(Sunil G via Stevel)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -43,7 +44,7 @@ import com.google.common.collect.Sets;
* detected, the thread logs a message. * detected, the thread logs a message.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class JvmPauseMonitor { public class JvmPauseMonitor extends AbstractService {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
JvmPauseMonitor.class); JvmPauseMonitor.class);
@ -51,13 +52,13 @@ public class JvmPauseMonitor {
private static final long SLEEP_INTERVAL_MS = 500; private static final long SLEEP_INTERVAL_MS = 500;
/** log WARN if we detect a pause longer than this threshold */ /** log WARN if we detect a pause longer than this threshold */
private final long warnThresholdMs; private long warnThresholdMs;
private static final String WARN_THRESHOLD_KEY = private static final String WARN_THRESHOLD_KEY =
"jvm.pause.warn-threshold.ms"; "jvm.pause.warn-threshold.ms";
private static final long WARN_THRESHOLD_DEFAULT = 10000; private static final long WARN_THRESHOLD_DEFAULT = 10000;
/** log INFO if we detect a pause longer than this threshold */ /** log INFO if we detect a pause longer than this threshold */
private final long infoThresholdMs; private long infoThresholdMs;
private static final String INFO_THRESHOLD_KEY = private static final String INFO_THRESHOLD_KEY =
"jvm.pause.info-threshold.ms"; "jvm.pause.info-threshold.ms";
private static final long INFO_THRESHOLD_DEFAULT = 1000; private static final long INFO_THRESHOLD_DEFAULT = 1000;
@ -69,25 +70,28 @@ public class JvmPauseMonitor {
private Thread monitorThread; private Thread monitorThread;
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
public JvmPauseMonitor(Configuration conf) { public JvmPauseMonitor() {
super(JvmPauseMonitor.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
super.serviceInit(conf);
} }
public void start() { @Override
Preconditions.checkState(monitorThread == null, protected void serviceStart() throws Exception {
"Already started");
monitorThread = new Daemon(new Monitor()); monitorThread = new Daemon(new Monitor());
if (shouldRun) { monitorThread.start();
monitorThread.start(); super.serviceStart();
} else {
LOG.warn("stop() was called before start() completed");
}
} }
public void stop() { @Override
protected void serviceStop() throws Exception {
shouldRun = false; shouldRun = false;
if (isStarted()) { if (monitorThread != null) {
monitorThread.interrupt(); monitorThread.interrupt();
try { try {
monitorThread.join(); monitorThread.join();
@ -95,6 +99,7 @@ public class JvmPauseMonitor {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
super.serviceStop();
} }
public boolean isStarted() { public boolean isStarted() {
@ -212,8 +217,11 @@ public class JvmPauseMonitor {
* with a 1GB heap will very quickly go into "GC hell" and result in * with a 1GB heap will very quickly go into "GC hell" and result in
* log messages about the GC pauses. * log messages about the GC pauses.
*/ */
@SuppressWarnings("resource")
public static void main(String []args) throws Exception { public static void main(String []args) throws Exception {
new JvmPauseMonitor(new Configuration()).start(); JvmPauseMonitor monitor = new JvmPauseMonitor();
monitor.init(new Configuration());
monitor.start();
List<String> list = Lists.newArrayList(); List<String> list = Lists.newArrayList();
int i = 0; int i = 0;
while (true) { while (true) {

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.metrics2.source; package org.apache.hadoop.metrics2.source;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import static org.apache.hadoop.test.MetricsAsserts.*; import static org.apache.hadoop.test.MetricsAsserts.*;
@ -26,6 +30,9 @@ import static org.apache.hadoop.test.MetricsAsserts.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.JvmPauseMonitor;
import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*; import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*;
@ -33,8 +40,23 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.*;
public class TestJvmMetrics { public class TestJvmMetrics {
@Test public void testPresence() { @Rule
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(new Configuration()); public Timeout timeout = new Timeout(30000);
private JvmPauseMonitor pauseMonitor;
/**
* Robust shutdown of the pause monitor if it hasn't been stopped already.
*/
@After
public void teardown() {
ServiceOperations.stop(pauseMonitor);
}
@Test
public void testPresence() {
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(new Configuration());
pauseMonitor.start();
JvmMetrics jvmMetrics = new JvmMetrics("test", "test"); JvmMetrics jvmMetrics = new JvmMetrics("test", "test");
jvmMetrics.setPauseMonitor(pauseMonitor); jvmMetrics.setPauseMonitor(pauseMonitor);
MetricsRecordBuilder rb = getMetrics(jvmMetrics); MetricsRecordBuilder rb = getMetrics(jvmMetrics);
@ -54,4 +76,48 @@ public class TestJvmMetrics {
verify(rb).addCounter(eq(info), anyLong()); verify(rb).addCounter(eq(info), anyLong());
} }
} }
@Test
public void testDoubleStop() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(new Configuration());
pauseMonitor.start();
pauseMonitor.stop();
pauseMonitor.stop();
}
@Test
public void testDoubleStart() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(new Configuration());
pauseMonitor.start();
pauseMonitor.start();
pauseMonitor.stop();
}
@Test
public void testStopBeforeStart() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
try {
pauseMonitor.init(new Configuration());
pauseMonitor.stop();
pauseMonitor.start();
Assert.fail("Expected an exception, got " + pauseMonitor);
} catch (ServiceStateException e) {
GenericTestUtils.assertExceptionContains("cannot enter state", e);
}
}
@Test
public void testStopBeforeInit() throws Throwable {
pauseMonitor = new JvmPauseMonitor();
try {
pauseMonitor.stop();
pauseMonitor.init(new Configuration());
Assert.fail("Expected an exception, got " + pauseMonitor);
} catch (ServiceStateException e) {
GenericTestUtils.assertExceptionContains("cannot enter state", e);
}
}
} }

View File

@ -243,7 +243,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
@Override @Override
public void startDaemons() { public void startDaemons() {
if (pauseMonitor == null) { if (pauseMonitor == null) {
pauseMonitor = new JvmPauseMonitor(config); pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(config);
pauseMonitor.start(); pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
} }

View File

@ -893,6 +893,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for
reusability. (Xiaobing Zhou via Arpit Agarwal) reusability. (Xiaobing Zhou via Arpit Agarwal)
HDFS-8947. NameNode, DataNode and NFS gateway to support JvmPauseMonitor as
a service. (Sunil G via Stevel)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -1244,7 +1244,8 @@ public class DataNode extends ReconfigurableBase
registerMXBean(); registerMXBean();
initDataXceiver(conf); initDataXceiver(conf);
startInfoServer(conf); startInfoServer(conf);
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start(); pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server. // BlockPoolTokenSecretManager is required to create ipc server.

View File

@ -691,7 +691,8 @@ public class NameNode implements NameNodeStatusMXBean {
httpServer.setFSImage(getFSImage()); httpServer.setFSImage(getFSImage());
} }
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start(); pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

View File

@ -145,7 +145,8 @@ public class JobHistoryServer extends CompositeService {
DefaultMetricsSystem.initialize("JobHistoryServer"); DefaultMetricsSystem.initialize("JobHistoryServer");
JvmMetrics jm = JvmMetrics.initSingleton("JobHistoryServer", null); JvmMetrics jm = JvmMetrics.initSingleton("JobHistoryServer", null);
pauseMonitor = new JvmPauseMonitor(getConfig()); pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor); jm.setPauseMonitor(pauseMonitor);
super.serviceInit(config); super.serviceInit(config);
@ -198,16 +199,12 @@ public class JobHistoryServer extends CompositeService {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
pauseMonitor.start();
super.serviceStart(); super.serviceStart();
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
super.serviceStop(); super.serviceStop();
} }

View File

@ -77,7 +77,6 @@ public class TestJobHistoryServer {
Configuration config = new Configuration(); Configuration config = new Configuration();
historyServer.init(config); historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState()); assertEquals(STATE.INITED, historyServer.getServiceState());
assertEquals(6, historyServer.getServices().size());
HistoryClientService historyService = historyServer.getClientService(); HistoryClientService historyService = historyServer.getClientService();
assertNotNull(historyServer.getClientService()); assertNotNull(historyServer.getClientService());
assertEquals(STATE.INITED, historyService.getServiceState()); assertEquals(STATE.INITED, historyService.getServiceState());

View File

@ -63,6 +63,10 @@ Release 2.9.0 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and
ResourceManager to support JvmPauseMonitor as a service.
(Sunil G via Stevel)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -103,7 +103,8 @@ public class ApplicationHistoryServer extends CompositeService {
DefaultMetricsSystem.initialize("ApplicationHistoryServer"); DefaultMetricsSystem.initialize("ApplicationHistoryServer");
JvmMetrics jm = JvmMetrics.initSingleton("ApplicationHistoryServer", null); JvmMetrics jm = JvmMetrics.initSingleton("ApplicationHistoryServer", null);
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor); jm.setPauseMonitor(pauseMonitor);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -116,9 +117,6 @@ public class ApplicationHistoryServer extends CompositeService {
throw new YarnRuntimeException("Failed to login", ie); throw new YarnRuntimeException("Failed to login", ie);
} }
if (pauseMonitor != null) {
pauseMonitor.start();
}
super.serviceStart(); super.serviceStart();
startWebApp(); startWebApp();
} }
@ -128,9 +126,6 @@ public class ApplicationHistoryServer extends CompositeService {
if (webApp != null) { if (webApp != null) {
webApp.stop(); webApp.stop();
} }
if (pauseMonitor != null) {
pauseMonitor.stop();
}
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
super.serviceStop(); super.serviceStop();
} }

View File

@ -73,7 +73,6 @@ public class TestApplicationHistoryServer {
historyServer = new ApplicationHistoryServer(); historyServer = new ApplicationHistoryServer();
historyServer.init(config); historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState()); assertEquals(STATE.INITED, historyServer.getServiceState());
assertEquals(5, historyServer.getServices().size());
ApplicationHistoryClientService historyService = ApplicationHistoryClientService historyService =
historyServer.getClientService(); historyServer.getClientService();
assertNotNull(historyServer.getClientService()); assertNotNull(historyServer.getClientService());

View File

@ -344,7 +344,8 @@ public class NodeManager extends CompositeService
dispatcher.register(NodeManagerEventType.class, this); dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher); addService(dispatcher);
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
DefaultMetricsSystem.initialize("NodeManager"); DefaultMetricsSystem.initialize("NodeManager");
@ -364,7 +365,6 @@ public class NodeManager extends CompositeService
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e); throw new YarnRuntimeException("Failed NodeManager login", e);
} }
pauseMonitor.start();
super.serviceStart(); super.serviceStart();
} }
@ -376,9 +376,6 @@ public class NodeManager extends CompositeService
try { try {
super.serviceStop(); super.serviceStop();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
} finally { } finally {
// YARN-3641: NM's services stop get failed shouldn't block the // YARN-3641: NM's services stop get failed shouldn't block the
// release of NMLevelDBStore. // release of NMLevelDBStore.

View File

@ -518,7 +518,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
DefaultMetricsSystem.initialize("ResourceManager"); DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics jm = JvmMetrics.initSingleton("ResourceManager", null); JvmMetrics jm = JvmMetrics.initSingleton("ResourceManager", null);
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor); jm.setPauseMonitor(pauseMonitor);
// Initialize the Reservation system // Initialize the Reservation system
@ -574,8 +575,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
// need events to move to further states. // need events to move to further states.
rmStore.start(); rmStore.start();
pauseMonitor.start();
if(recoveryEnabled) { if(recoveryEnabled) {
try { try {
LOG.info("Recovery started"); LOG.info("Recovery started");
@ -602,10 +601,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
super.serviceStop(); super.serviceStop();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
if (rmContext != null) { if (rmContext != null) {
RMStateStore store = rmContext.getStateStore(); RMStateStore store = rmContext.getStateStore();

View File

@ -67,7 +67,8 @@ public class WebAppProxyServer extends CompositeService {
DefaultMetricsSystem.initialize("WebAppProxyServer"); DefaultMetricsSystem.initialize("WebAppProxyServer");
JvmMetrics jm = JvmMetrics.initSingleton("WebAppProxyServer", null); JvmMetrics jm = JvmMetrics.initSingleton("WebAppProxyServer", null);
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor); jm.setPauseMonitor(pauseMonitor);
super.serviceInit(config); super.serviceInit(config);
@ -75,9 +76,6 @@ public class WebAppProxyServer extends CompositeService {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
if (pauseMonitor != null) {
pauseMonitor.start();
}
super.serviceStart(); super.serviceStart();
} }
@ -85,9 +83,6 @@ public class WebAppProxyServer extends CompositeService {
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
super.serviceStop(); super.serviceStop();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
if (pauseMonitor != null) {
pauseMonitor.stop();
}
} }
/** /**