From 1aee5f0552c93a1e27820b9468771f838a0712c0 Mon Sep 17 00:00:00 2001 From: chenxu14 <47170471+chenxu14@users.noreply.github.com> Date: Sat, 12 Oct 2019 03:05:54 +0800 Subject: [PATCH] HBASE-23083 Collect Executor status info periodically and report to (#664) metrics system Signed-off-by: stack --- .../org/apache/hadoop/hbase/HConstants.java | 3 + .../hadoop/hbase/ExecutorStatusChore.java | 86 +++++++++++++++ .../hbase/executor/ExecutorService.java | 8 ++ .../hbase/regionserver/HRegionServer.java | 48 +++++++-- .../hadoop/hbase/TestExecutorStatusChore.java | 102 ++++++++++++++++++ 5 files changed, 238 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index a999e73b02e..8ee48c9f529 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1191,6 +1191,9 @@ public final class HConstants { "hbase.node.health.failure.threshold"; public static final int DEFAULT_HEALTH_FAILURE_THRESHOLD = 3; + public static final String EXECUTOR_STATUS_COLLECT_ENABLED = + "hbase.executors.status.collect.enabled"; + public static final boolean DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED = true; /** * Setting to activate, or not, the publication of the status by the master. Default diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java new file mode 100644 index 00000000000..da03eba0351 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.Map; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * The Class ExecutorStatusChore for collect Executor status info periodically + * and report to metrics system + */ +@InterfaceAudience.Private +public class ExecutorStatusChore extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(HealthCheckChore.class); + public static final String WAKE_FREQ = "hbase.executors.status.collect.period"; + public static final int DEFAULT_WAKE_FREQ = 60000; + private ExecutorService service; + private DynamicMetricsRegistry metricsRegistry; + + public ExecutorStatusChore(int sleepTime, Stoppable stopper, ExecutorService service, + MetricsRegionServerSource metrics) { + super("ExecutorStatusChore", stopper, sleepTime); + LOG.info("ExecutorStatusChore runs every {} ", StringUtils.formatTime(sleepTime)); + this.service = service; + this.metricsRegistry = ((MetricsRegionServerSourceImpl) metrics).getMetricsRegistry(); + } + + @Override + protected void chore() { + try{ + // thread pool monitor + Map statuses = service.getAllExecutorStatuses(); + for (Map.Entry statusEntry : statuses.entrySet()) { + String name = statusEntry.getKey(); + // Executor's name is generate by ExecutorType#getExecutorName + // include ExecutorType & Servername(split by '-'), here we only need the ExecutorType + String poolName = name.split("-")[0]; + ExecutorStatus status = statusEntry.getValue(); + MutableGaugeLong queued = metricsRegistry.getGauge(poolName + "_queued", 0L); + MutableGaugeLong running = metricsRegistry.getGauge(poolName + "_running", 0L); + int queueSize = status.getQueuedEvents().size(); + int runningSize = status.getRunning().size(); + if (queueSize > 0) { + LOG.warn("{}'s size info, queued: {}, running: {}", poolName, queueSize, runningSize); + } + queued.set(queueSize); + running.set(runningSize); + } + } catch(Throwable e) { + LOG.error(e.getMessage(), e); + } + } + + @VisibleForTesting + public Pair getExecutorStatus(String poolName) { + MutableGaugeLong running = metricsRegistry.getGauge(poolName + "_running", 0L); + MutableGaugeLong queued = metricsRegistry.getGauge(poolName + "_queued", 0L); + return new Pair(running.value(), queued.value()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 71d8ea5fa02..ea788acc8c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -313,6 +313,14 @@ public class ExecutorService { this.running = running; } + public List getQueuedEvents() { + return queuedEvents; + } + + public List getRunning() { + return running; + } + /** * Dump a textual representation of the executor's status * to the given writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a0a6b4cbc5f..11fd8c7e54b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ExecutorStatusChore; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; @@ -442,6 +443,9 @@ public class HRegionServer extends HasThread implements /** The health check chore. */ private HealthCheckChore healthCheckChore; + /** The Executor status collect chore. */ + private ExecutorStatusChore executorStatusChore; + /** The nonce manager chore. */ private ScheduledChore nonceManagerChore; @@ -1922,6 +1926,14 @@ public class HRegionServer extends HasThread implements HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); } + // Executor status collect thread. + if (this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, + HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)) { + int sleepTime = this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, + ExecutorStatusChore.DEFAULT_WAKE_FREQ); + executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), + this.getRegionServerMetrics().getMetricsSource()); + } this.walRoller = new LogRoller(this, this); this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); @@ -1970,25 +1982,42 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", - uncaughtExceptionHandler); + uncaughtExceptionHandler); if (this.cacheFlusher != null) { this.cacheFlusher.start(uncaughtExceptionHandler); } Threads.setDaemonThreadRunning(this.procedureResultReporter, getName() + ".procedureResultReporter", uncaughtExceptionHandler); - if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); - if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); - if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); - if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); - if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); - if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); - if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); + if (this.compactionChecker != null) { + choreService.scheduleChore(compactionChecker); + } + if (this.periodicFlusher != null) { + choreService.scheduleChore(periodicFlusher); + } + if (this.healthCheckChore != null) { + choreService.scheduleChore(healthCheckChore); + } + if (this.executorStatusChore != null) { + choreService.scheduleChore(executorStatusChore); + } + if (this.nonceManagerChore != null) { + choreService.scheduleChore(nonceManagerChore); + } + if (this.storefileRefresher != null) { + choreService.scheduleChore(storefileRefresher); + } + if (this.movedRegionsCleaner != null) { + choreService.scheduleChore(movedRegionsCleaner); + } + if (this.fsUtilizationChore != null) { + choreService.scheduleChore(fsUtilizationChore); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", - uncaughtExceptionHandler); + uncaughtExceptionHandler); // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for @@ -2500,6 +2529,7 @@ public class HRegionServer extends HasThread implements choreService.cancelChore(compactionChecker); choreService.cancelChore(periodicFlusher); choreService.cancelChore(healthCheckChore); + choreService.cancelChore(executorStatusChore); choreService.cancelChore(storefileRefresher); choreService.cancelChore(movedRegionsCleaner); choreService.cancelChore(fsUtilizationChore); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java new file mode 100644 index 00000000000..ce3e8ffdf3f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactory; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MiscTests.class, SmallTests.class}) +public class TestExecutorStatusChore { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestExecutorStatusChore.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestExecutorStatusChore.class); + + @Test + public void testMetricsCollect() throws Exception { + int maxThreads = 5; + int maxTries = 10; + int sleepInterval = 10; + + Server mockedServer = mock(Server.class); + when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create()); + + // Start an executor service pool with max 5 threads + ExecutorService executorService = new ExecutorService("unit_test"); + executorService.startExecutorService( + ExecutorType.RS_PARALLEL_SEEK, maxThreads); + + MetricsRegionServerSource serverSource = CompatibilitySingletonFactory + .getInstance(MetricsRegionServerSourceFactory.class).createServer(null); + assertTrue(serverSource instanceof MetricsRegionServerSourceImpl); + + ExecutorStatusChore statusChore = new ExecutorStatusChore(60000, + mockedServer, executorService, serverSource); + + AtomicBoolean lock = new AtomicBoolean(true); + AtomicInteger counter = new AtomicInteger(0); + + for (int i = 0; i < maxThreads + 1; i++) { + executorService.submit(new TestEventHandler(mockedServer, + EventType.RS_PARALLEL_SEEK, lock, counter)); + } + + // The TestEventHandler will increment counter when it starts. + int tries = 0; + while (counter.get() < maxThreads && tries < maxTries) { + LOG.info("Waiting for all event handlers to start..."); + Thread.sleep(sleepInterval); + tries++; + } + + // Assert that pool is at max threads. + assertEquals(maxThreads, counter.get()); + + statusChore.chore(); + Pair executorStatus = statusChore.getExecutorStatus("RS_PARALLEL_SEEK"); + assertEquals(maxThreads, executorStatus.getFirst().intValue()); // running + assertEquals(1, executorStatus.getSecond().intValue()); // pending + + // Now interrupt the running Executor + synchronized (lock) { + lock.set(false); + lock.notifyAll(); + } + executorService.shutdown(); + } +}