diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 9b7751d48a3..7dcc08ec484 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -203,13 +203,13 @@ public enum EventType { * C_M_SNAPSHOT_TABLE
* Client asking Master to snapshot an offline table. */ - C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS), + C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS), /** * Messages originating from Client to Master.
* C_M_RESTORE_SNAPSHOT
* Client asking Master to restore a snapshot. */ - C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS), + C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS), // Updates from master to ZK. This is done by the master and there is // nothing to process by either Master or RS diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index e9b0ad58c57..36b9a2cf66b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -36,6 +36,7 @@ public enum ExecutorType { MASTER_RS_SHUTDOWN (5), MASTER_META_SERVER_OPERATIONS (6), M_LOG_REPLAY_OPS (7), + MASTER_SNAPSHOT_OPERATIONS (8), // RegionServer executor services RS_OPEN_REGION (20), diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5b7ae8acc48..e30856c93aa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1340,6 +1340,33 @@ public final class HConstants { public static final String SNAPSHOT_CLEANER_DISABLE = "hbase.master.cleaner.snapshot.disable"; + /** + * Configurations for master executor services. + */ + public static final String MASTER_OPEN_REGION_THREADS = + "hbase.master.executor.openregion.threads"; + public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5; + + public static final String MASTER_CLOSE_REGION_THREADS = + "hbase.master.executor.closeregion.threads"; + public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5; + + public static final String MASTER_SERVER_OPERATIONS_THREADS = + "hbase.master.executor.serverops.threads"; + public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5; + + public static final String MASTER_META_SERVER_OPERATIONS_THREADS = + "hbase.master.executor.meta.serverops.threads"; + public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5; + + public static final String MASTER_LOG_REPLAY_OPS_THREADS = + "hbase.master.executor.logreplayops.threads"; + public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + + public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS = + "hbase.master.executor.snapshot.threads"; + public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 243f448860b..07d4ff8f23f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1212,18 +1212,23 @@ public class HMaster extends HRegionServer implements MasterServices, Server { * as OOMEs; it should be lightly loaded. See what HRegionServer does if * need to install an unexpected exception handler. */ - private void startServiceThreads() throws IOException{ - // Start the executor service pools - this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, - conf.getInt("hbase.master.executor.openregion.threads", 5)); - this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, - conf.getInt("hbase.master.executor.closeregion.threads", 5)); - this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 5)); - this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 5)); - this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, - conf.getInt("hbase.master.executor.logreplayops.threads", 10)); + private void startServiceThreads() throws IOException { + // Start the executor service pools + this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt( + HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT)); + this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt( + HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT)); + this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, + conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS, + HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT)); + this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, + conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS, + HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT)); + this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt( + HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT)); + this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, + conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS, + HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index b9724f3cde4..24e46dfd248 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.executor; import static org.junit.Assert.*; import java.io.IOException; import java.io.StringWriter; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.executor.ExecutorService.Executor; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -207,5 +209,39 @@ public class TestExecutorService { executorService.shutdown(); } + @Test + public void testSnapshotHandlers() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + final Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(conf); + + final ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); + executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1); + + final CountDownLatch latch = new CountDownLatch(1); + executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) { + @Override + public void process() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) + .getThreadPoolExecutor().getActiveCount(); + Assert.assertEquals(activeCount, 1); + latch.countDown(); + Waiter.waitFor(conf, 3000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) + .getThreadPoolExecutor().getActiveCount(); + return count == 0; + } + }); + } }