HBASE-20169 NPE when calling HBTU.shutdownMiniCluster (TestAssignmentManagerMetrics is flakey); AMENDMENT

This commit is contained in:
Chia-Ping Tsai 2018-05-01 15:16:13 -07:00 committed by Michael Stack
parent b0719ec11b
commit 984fb5bd05
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
2 changed files with 36 additions and 11 deletions

View File

@ -264,9 +264,31 @@ public class ProcedureExecutor<TEnvironment> {
private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>();
private Configuration conf; private Configuration conf;
/**
* Created in the {@link #start(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
*/
private ThreadGroup threadGroup; private ThreadGroup threadGroup;
/**
* Created in the {@link #start(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
*/
private CopyOnWriteArrayList<WorkerThread> workerThreads; private CopyOnWriteArrayList<WorkerThread> workerThreads;
/**
* Created in the {@link #start(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
*/
private TimeoutExecutorThread timeoutExecutor; private TimeoutExecutorThread timeoutExecutor;
private int corePoolSize; private int corePoolSize;
private int maxPoolSize; private int maxPoolSize;
@ -299,6 +321,7 @@ public class ProcedureExecutor<TEnvironment> {
this.conf = conf; this.conf = conf;
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
refreshConfiguration(conf); refreshConfiguration(conf);
} }
private void load(final boolean abortOnCorruption) throws IOException { private void load(final boolean abortOnCorruption) throws IOException {
@ -510,11 +533,8 @@ public class ProcedureExecutor<TEnvironment> {
LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
corePoolSize, maxPoolSize); corePoolSize, maxPoolSize);
// Create the Thread Group for the executors this.threadGroup = new ThreadGroup("PEWorkerGroup");
threadGroup = new ThreadGroup("PEWorkerGroup"); this.timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
// Create the timeout executor
timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
// Create the workers // Create the workers
workerId.set(0); workerId.set(0);
@ -576,22 +596,21 @@ public class ProcedureExecutor<TEnvironment> {
// stop the timeout executor // stop the timeout executor
timeoutExecutor.awaitTermination(); timeoutExecutor.awaitTermination();
timeoutExecutor = null;
// stop the worker threads // stop the worker threads
for (WorkerThread worker: workerThreads) { for (WorkerThread worker: workerThreads) {
worker.awaitTermination(); worker.awaitTermination();
} }
workerThreads = null;
// Destroy the Thread Group for the executors // Destroy the Thread Group for the executors
// TODO: Fix. #join is not place to destroy resources.
try { try {
threadGroup.destroy(); threadGroup.destroy();
} catch (IllegalThreadStateException e) { } catch (IllegalThreadStateException e) {
LOG.error("ThreadGroup " + threadGroup + " contains running threads; " + e.getMessage()); LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
threadGroup.list(); this.threadGroup, e.getMessage());
} finally { // This dumps list of threads on STDOUT.
threadGroup = null; this.threadGroup.list();
} }
// reset the in-memory state for testing // reset the in-memory state for testing

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -91,6 +92,8 @@ public class TestAssignmentManagerMetrics {
// set a small interval for updating rit metrics // set a small interval for updating rit metrics
conf.setInt(AssignmentManager.RIT_CHORE_INTERVAL_MSEC_CONF_KEY, MSG_INTERVAL); conf.setInt(AssignmentManager.RIT_CHORE_INTERVAL_MSEC_CONF_KEY, MSG_INTERVAL);
// keep rs online so it can report the failed opens.
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
CLUSTER = TEST_UTIL.getHBaseCluster(); CLUSTER = TEST_UTIL.getHBaseCluster();
MASTER = CLUSTER.getMaster(); MASTER = CLUSTER.getMaster();
@ -148,6 +151,9 @@ public class TestAssignmentManagerMetrics {
} }
// Sleep 3 seconds, wait for doMetrics chore catching up // Sleep 3 seconds, wait for doMetrics chore catching up
// the rit count consists of rit and failed opens. see RegionInTransitionStat#update
// Waiting for the completion of rit makes the assert stable.
TEST_UTIL.waitUntilNoRegionsInTransition();
Thread.sleep(MSG_INTERVAL * 3); Thread.sleep(MSG_INTERVAL * 3);
METRICS_HELPER.assertGauge(MetricsAssignmentManagerSource.RIT_COUNT_NAME, 1, amSource); METRICS_HELPER.assertGauge(MetricsAssignmentManagerSource.RIT_COUNT_NAME, 1, amSource);
METRICS_HELPER.assertGauge(MetricsAssignmentManagerSource.RIT_COUNT_OVER_THRESHOLD_NAME, 1, METRICS_HELPER.assertGauge(MetricsAssignmentManagerSource.RIT_COUNT_OVER_THRESHOLD_NAME, 1,