HBASE-24075: Fix a race between master shutdown and metrics (re)init

JMXCacheBuster resets the metrics state at various points in time. These
events can potentially race with a master shutdown. When the master is
tearing down, metrics initialization can touch a lot of unsafe state,
for example invalidated FS objects. To avoid this, this patch makes
the getMetrics() a no-op when the master is either stopped or in the
process of shutting down. Additionally, getClusterId() when the server
is shutting down is made a no-op.

Simulating a test for this is a bit tricky but with the patch I don't
locally see the long stacktraces from the jira.

Signed-off-by: Michael Stack <stack@apache.org>
(cherry picked from commit 6f213e9d5a)
(cherry picked from commit 9384b84552)
This commit is contained in:
Bharath Vissapragada 2020-03-31 00:16:15 -07:00
parent a85e73e916
commit 4d1309b6c4
6 changed files with 28 additions and 9 deletions

View File

@ -29,6 +29,11 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public interface MetricsMasterWrapper { public interface MetricsMasterWrapper {
/**
* Returns if the master is currently running and is not attempting to shutdown.
*/
boolean isRunning();
/** /**
* Get ServerName * Get ServerName
*/ */

View File

@ -82,7 +82,9 @@ public class MetricsMasterSourceImpl
MetricsRecordBuilder metricsRecordBuilder = metricsCollector.addRecord(metricsName); MetricsRecordBuilder metricsRecordBuilder = metricsCollector.addRecord(metricsName);
// masterWrapper can be null because this function is called inside of init. // masterWrapper can be null because this function is called inside of init.
if (masterWrapper != null) { // If the master is already stopped or has initiated a shutdown, no point in registering the
// metrics again.
if (masterWrapper != null && masterWrapper.isRunning()) {
metricsRecordBuilder metricsRecordBuilder
.addGauge(Interns.info(MERGE_PLAN_COUNT_NAME, MERGE_PLAN_COUNT_DESC), .addGauge(Interns.info(MERGE_PLAN_COUNT_NAME, MERGE_PLAN_COUNT_DESC),
masterWrapper.getMergePlanCount()) masterWrapper.getMergePlanCount())

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -46,8 +47,8 @@ public class CachedClusterId {
public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class); public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class);
private static final int MAX_FETCH_TIMEOUT_MS = 10000; private static final int MAX_FETCH_TIMEOUT_MS = 10000;
private Path rootDir; private final Path rootDir;
private FileSystem fs; private final FileSystem fs;
// When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to
// avoid multiple fetches from FS and let only one thread fetch the information. // avoid multiple fetches from FS and let only one thread fetch the information.
@ -58,12 +59,15 @@ public class CachedClusterId {
// Immutable once set and read multiple times. // Immutable once set and read multiple times.
private ClusterId clusterId; private ClusterId clusterId;
private final Server server;
// cache stats for testing. // cache stats for testing.
private AtomicInteger cacheMisses = new AtomicInteger(0); private AtomicInteger cacheMisses = new AtomicInteger(0);
public CachedClusterId(Configuration conf) throws IOException { public CachedClusterId(Server server, Configuration conf) throws IOException {
rootDir = FSUtils.getRootDir(conf); this.rootDir = FSUtils.getRootDir(conf);
fs = rootDir.getFileSystem(conf); this.fs = rootDir.getFileSystem(conf);
this.server = server;
} }
/** /**
@ -130,9 +134,12 @@ public class CachedClusterId {
* trying get from a clean cache. * trying get from a clean cache.
* *
* @return ClusterId by reading from FileSystem or null in any error case or cluster ID does * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does
* not exist on the file system. * not exist on the file system or if the server initiated a tear down.
*/ */
public String getFromCacheOrFetch() { public String getFromCacheOrFetch() {
if (server.isStopping() || server.isStopped()) {
return null;
}
String id = getClusterId(); String id = getClusterId();
if (id != null) { if (id != null) {
return id; return id;

View File

@ -574,7 +574,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.metaRegionLocationCache = null; this.metaRegionLocationCache = null;
this.activeMasterManager = null; this.activeMasterManager = null;
} }
cachedClusterId = new CachedClusterId(conf); cachedClusterId = new CachedClusterId(this, conf);
} catch (Throwable t) { } catch (Throwable t) {
// Make sure we log the exception. HMaster is often started via reflection and the // Make sure we log the exception. HMaster is often started via reflection and the
// cause of failed startup is lost. // cause of failed startup is lost.

View File

@ -129,6 +129,10 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
return serverManager.getDeadServers().size(); return serverManager.getDeadServers().size();
} }
@Override public boolean isRunning() {
return !(master.isStopped() || master.isStopping());
}
@Override @Override
public String getServerName() { public String getServerName() {
ServerName serverName = master.getServerName(); ServerName serverName = master.getServerName();

View File

@ -76,7 +76,8 @@ public class TestCachedClusterId {
@Test @Test
public void testMultiThreadedGetClusterId() throws Exception { public void testMultiThreadedGetClusterId() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
CachedClusterId cachedClusterId = new CachedClusterId(conf); CachedClusterId cachedClusterId = new CachedClusterId(TEST_UTIL.getHBaseCluster().getMaster(),
conf);
TestContext context = new TestContext(conf); TestContext context = new TestContext(conf);
int numThreads = 16; int numThreads = 16;
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {