HBASE-24075: Fix a race between master shutdown and metrics (re)init
JMXCacheBuster resets the metrics state at various points in time. These
events can potentially race with a master shutdown. When the master is
tearing down, metrics initialization can touch a lot of unsafe state,
for example invalidated FS objects. To avoid this, this patch makes
the getMetrics() a no-op when the master is either stopped or in the
process of shutting down. Additionally, getClusterId() when the server
is shutting down is made a no-op.
Simulating a test for this is a bit tricky but with the patch I don't
locally see the long stacktraces from the jira.
Signed-off-by: Michael Stack <stack@apache.org>
(cherry picked from commit 6f213e9d5a
)
This commit is contained in:
parent
40caac9b61
commit
9384b84552
|
@ -29,6 +29,11 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface MetricsMasterWrapper {
|
public interface MetricsMasterWrapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns if the master is currently running and is not attempting to shutdown.
|
||||||
|
*/
|
||||||
|
boolean isRunning();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get ServerName
|
* Get ServerName
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -82,7 +82,9 @@ public class MetricsMasterSourceImpl
|
||||||
MetricsRecordBuilder metricsRecordBuilder = metricsCollector.addRecord(metricsName);
|
MetricsRecordBuilder metricsRecordBuilder = metricsCollector.addRecord(metricsName);
|
||||||
|
|
||||||
// masterWrapper can be null because this function is called inside of init.
|
// masterWrapper can be null because this function is called inside of init.
|
||||||
if (masterWrapper != null) {
|
// If the master is already stopped or has initiated a shutdown, no point in registering the
|
||||||
|
// metrics again.
|
||||||
|
if (masterWrapper != null && masterWrapper.isRunning()) {
|
||||||
metricsRecordBuilder
|
metricsRecordBuilder
|
||||||
.addGauge(Interns.info(MERGE_PLAN_COUNT_NAME, MERGE_PLAN_COUNT_DESC),
|
.addGauge(Interns.info(MERGE_PLAN_COUNT_NAME, MERGE_PLAN_COUNT_DESC),
|
||||||
masterWrapper.getMergePlanCount())
|
masterWrapper.getMergePlanCount())
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ClusterId;
|
import org.apache.hadoop.hbase.ClusterId;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -46,8 +47,8 @@ public class CachedClusterId {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class);
|
public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class);
|
||||||
private static final int MAX_FETCH_TIMEOUT_MS = 10000;
|
private static final int MAX_FETCH_TIMEOUT_MS = 10000;
|
||||||
|
|
||||||
private Path rootDir;
|
private final Path rootDir;
|
||||||
private FileSystem fs;
|
private final FileSystem fs;
|
||||||
|
|
||||||
// When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to
|
// When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to
|
||||||
// avoid multiple fetches from FS and let only one thread fetch the information.
|
// avoid multiple fetches from FS and let only one thread fetch the information.
|
||||||
|
@ -58,12 +59,15 @@ public class CachedClusterId {
|
||||||
// Immutable once set and read multiple times.
|
// Immutable once set and read multiple times.
|
||||||
private ClusterId clusterId;
|
private ClusterId clusterId;
|
||||||
|
|
||||||
|
private final Server server;
|
||||||
|
|
||||||
// cache stats for testing.
|
// cache stats for testing.
|
||||||
private AtomicInteger cacheMisses = new AtomicInteger(0);
|
private AtomicInteger cacheMisses = new AtomicInteger(0);
|
||||||
|
|
||||||
public CachedClusterId(Configuration conf) throws IOException {
|
public CachedClusterId(Server server, Configuration conf) throws IOException {
|
||||||
rootDir = FSUtils.getRootDir(conf);
|
this.rootDir = FSUtils.getRootDir(conf);
|
||||||
fs = rootDir.getFileSystem(conf);
|
this.fs = rootDir.getFileSystem(conf);
|
||||||
|
this.server = server;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,9 +134,12 @@ public class CachedClusterId {
|
||||||
* trying get from a clean cache.
|
* trying get from a clean cache.
|
||||||
*
|
*
|
||||||
* @return ClusterId by reading from FileSystem or null in any error case or cluster ID does
|
* @return ClusterId by reading from FileSystem or null in any error case or cluster ID does
|
||||||
* not exist on the file system.
|
* not exist on the file system or if the server initiated a tear down.
|
||||||
*/
|
*/
|
||||||
public String getFromCacheOrFetch() {
|
public String getFromCacheOrFetch() {
|
||||||
|
if (server.isStopping() || server.isStopped()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
String id = getClusterId();
|
String id = getClusterId();
|
||||||
if (id != null) {
|
if (id != null) {
|
||||||
return id;
|
return id;
|
||||||
|
|
|
@ -574,7 +574,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
this.metaRegionLocationCache = null;
|
this.metaRegionLocationCache = null;
|
||||||
this.activeMasterManager = null;
|
this.activeMasterManager = null;
|
||||||
}
|
}
|
||||||
cachedClusterId = new CachedClusterId(conf);
|
cachedClusterId = new CachedClusterId(this, conf);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
// Make sure we log the exception. HMaster is often started via reflection and the
|
// Make sure we log the exception. HMaster is often started via reflection and the
|
||||||
// cause of failed startup is lost.
|
// cause of failed startup is lost.
|
||||||
|
|
|
@ -129,6 +129,10 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
|
||||||
return serverManager.getDeadServers().size();
|
return serverManager.getDeadServers().size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public boolean isRunning() {
|
||||||
|
return !(master.isStopped() || master.isStopping());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getServerName() {
|
public String getServerName() {
|
||||||
ServerName serverName = master.getServerName();
|
ServerName serverName = master.getServerName();
|
||||||
|
|
|
@ -76,7 +76,8 @@ public class TestCachedClusterId {
|
||||||
@Test
|
@Test
|
||||||
public void testMultiThreadedGetClusterId() throws Exception {
|
public void testMultiThreadedGetClusterId() throws Exception {
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
CachedClusterId cachedClusterId = new CachedClusterId(conf);
|
CachedClusterId cachedClusterId = new CachedClusterId(TEST_UTIL.getHBaseCluster().getMaster(),
|
||||||
|
conf);
|
||||||
TestContext context = new TestContext(conf);
|
TestContext context = new TestContext(conf);
|
||||||
int numThreads = 16;
|
int numThreads = 16;
|
||||||
for (int i = 0; i < numThreads; i++) {
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
|
Loading…
Reference in New Issue