diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index b0322b3a35f..6e66f57c783 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -946,7 +946,7 @@ public final class HConstants { /** * Pattern that matches a coprocessor specification. Form is: * - *<coprocessor jar file location> '|' < ['|' <priority> ['|' <arguments>]] + *<coprocessor jar file location> '|' <class name> ['|' <priority> ['|' <arguments>]] * * ...where arguments are <KEY> '=' <VALUE> [,...] *

For example: hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2 @@ -1137,6 +1137,9 @@ public final class HConstants { "hbase.node.health.failure.threshold"; public static final int DEFAULT_HEALTH_FAILURE_THRESHOLD = 3; + public static final String EXECUTOR_STATUS_COLLECT_ENABLED = + "hbase.executors.status.collect.enabled"; + public static final boolean DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED = true; /** * Setting to activate, or not, the publication of the status by the master. Default diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java new file mode 100644 index 00000000000..abe99e95b95 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ExecutorStatusChore.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Class ExecutorStatusChore for collect Executor status info periodically + * and report to metrics system + */ +@InterfaceAudience.Private +public class ExecutorStatusChore extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(HealthCheckChore.class); + public static final String WAKE_FREQ = "hbase.executors.status.collect.period"; + public static final int DEFAULT_WAKE_FREQ = 60000; + private ExecutorService service; + private DynamicMetricsRegistry metricsRegistry; + + public ExecutorStatusChore(int sleepTime, Stoppable stopper, ExecutorService service, + MetricsRegionServerSource metrics) { + super("ExecutorStatusChore", stopper, sleepTime); + LOG.info("ExecutorStatusChore runs every {} ", StringUtils.formatTime(sleepTime)); + this.service = service; + this.metricsRegistry = ((MetricsRegionServerSourceImpl) metrics).getMetricsRegistry(); + } + + @Override + protected void chore() { + try{ + // thread pool monitor + Map statuses = service.getAllExecutorStatuses(); + for (Map.Entry statusEntry : statuses.entrySet()) { + String name = statusEntry.getKey(); + // Executor's name is generate by ExecutorType#getExecutorName + // include ExecutorType & Servername(split by '-'), here we only need the ExecutorType + String poolName = name.split("-")[0]; + ExecutorStatus status = statusEntry.getValue(); + MutableGaugeLong queued = metricsRegistry.getGauge(poolName + "_queued", 0L); + MutableGaugeLong running = metricsRegistry.getGauge(poolName + "_running", 0L); + int queueSize = status.getQueuedEvents().size(); + int runningSize = status.getRunning().size(); + if (queueSize > 0) { + LOG.warn("{}'s size info, queued: {}, running: {}", poolName, queueSize, runningSize); + } + queued.set(queueSize); + running.set(runningSize); + } + } catch(Throwable e) { + LOG.error(e.getMessage(), e); + } + } + + @VisibleForTesting + public Pair getExecutorStatus(String poolName) { + MutableGaugeLong running = metricsRegistry.getGauge(poolName + "_running", 0L); + MutableGaugeLong queued = metricsRegistry.getGauge(poolName + "_queued", 0L); + return new Pair(running.value(), queued.value()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 479184feb1e..c061d0ef558 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -320,6 +320,14 @@ public class ExecutorService { this.running = running; } + public List getQueuedEvents() { + return queuedEvents; + } + + public List getRunning() { + return running; + } + /** * Dump a textual representation of the executor's status * to the given writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cdcdb1efc67..cfd57a6885e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.ExecutorStatusChore; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; @@ -423,6 +424,9 @@ public class HRegionServer extends HasThread implements /** The health check chore. */ private HealthCheckChore healthCheckChore; + /** The Executor status collect chore. */ + private ExecutorStatusChore executorStatusChore; + /** The nonce manager chore. */ private ScheduledChore nonceManagerChore; @@ -1500,6 +1504,16 @@ public class HRegionServer extends HasThread implements pauseMonitor.start(); startServiceThreads(); + // start Executor status collect thread. can't do this in preRegistrationInitialization + // since MetricsRegionServer has not been instantiated + if (this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, + HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)) { + int sleepTime = this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, + ExecutorStatusChore.DEFAULT_WAKE_FREQ); + executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), + this.getRegionServerMetrics().getMetricsSource()); + } + startHeapMemoryManager(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + @@ -1854,13 +1868,27 @@ public class HRegionServer extends HasThread implements if (this.cacheFlusher != null) { this.cacheFlusher.start(uncaughtExceptionHandler); } - - if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); - if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); - if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); - if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); - if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); - if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); + if (this.compactionChecker != null) { + choreService.scheduleChore(compactionChecker); + } + if (this.periodicFlusher != null) { + choreService.scheduleChore(periodicFlusher); + } + if (this.healthCheckChore != null) { + choreService.scheduleChore(healthCheckChore); + } + if (this.executorStatusChore != null) { + choreService.scheduleChore(executorStatusChore); + } + if (this.nonceManagerChore != null) { + choreService.scheduleChore(nonceManagerChore); + } + if (this.storefileRefresher != null) { + choreService.scheduleChore(storefileRefresher); + } + if (this.movedRegionsCleaner != null) { + choreService.scheduleChore(movedRegionsCleaner); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -2297,6 +2325,7 @@ public class HRegionServer extends HasThread implements choreService.cancelChore(compactionChecker); choreService.cancelChore(periodicFlusher); choreService.cancelChore(healthCheckChore); + choreService.cancelChore(executorStatusChore); choreService.cancelChore(storefileRefresher); choreService.cancelChore(movedRegionsCleaner); // clean up the remaining scheduled chores (in case we missed out any) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java new file mode 100644 index 00000000000..e0b4b46f8e9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactory; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MiscTests.class, SmallTests.class}) +public class TestExecutorStatusChore { + private static final Logger LOG = LoggerFactory.getLogger(TestExecutorStatusChore.class); + + @Test + public void testMetricsCollect() throws Exception { + int maxThreads = 5; + int maxTries = 10; + int sleepInterval = 10; + + Server mockedServer = mock(Server.class); + when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create()); + + // Start an executor service pool with max 5 threads + ExecutorService executorService = new ExecutorService("unit_test"); + executorService.startExecutorService( + ExecutorType.RS_PARALLEL_SEEK, maxThreads); + + MetricsRegionServerSource serverSource = CompatibilitySingletonFactory + .getInstance(MetricsRegionServerSourceFactory.class).createServer(null); + assertTrue(serverSource instanceof MetricsRegionServerSourceImpl); + + ExecutorStatusChore statusChore = new ExecutorStatusChore(60000, + mockedServer, executorService, serverSource); + + AtomicBoolean lock = new AtomicBoolean(true); + AtomicInteger counter = new AtomicInteger(0); + + for (int i = 0; i < maxThreads + 1; i++) { + executorService.submit(new TestEventHandler(mockedServer, + EventType.RS_PARALLEL_SEEK, lock, counter)); + } + + // The TestEventHandler will increment counter when it starts. + int tries = 0; + while (counter.get() < maxThreads && tries < maxTries) { + LOG.info("Waiting for all event handlers to start..."); + Thread.sleep(sleepInterval); + tries++; + } + + // Assert that pool is at max threads. + assertEquals(maxThreads, counter.get()); + + statusChore.chore(); + Pair executorStatus = statusChore.getExecutorStatus("RS_PARALLEL_SEEK"); + assertEquals(maxThreads, executorStatus.getFirst().intValue()); // running + assertEquals(1, executorStatus.getSecond().intValue()); // pending + + // Now interrupt the running Executor + synchronized (lock) { + lock.set(false); + lock.notifyAll(); + } + executorService.shutdown(); + } +}