From f49a86ae4f55fb8d3c0fceb60337d819a1737f63 Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 15 Aug 2019 10:57:42 +0800 Subject: [PATCH] HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (#486) --- .../org/apache/hadoop/hbase/HConstants.java | 27 +++++++++++++++ .../hadoop/hbase/executor/EventType.java | 9 ++--- .../hadoop/hbase/executor/ExecutorType.java | 1 + .../apache/hadoop/hbase/master/HMaster.java | 29 +++++++++------- .../hbase/executor/TestExecutorService.java | 33 +++++++++++++++++++ 5 files changed, 80 insertions(+), 19 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 097e411d6e9..aea6328262c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1439,6 +1439,33 @@ public final class HConstants { "hbase.util.default.lossycounting.errorrate"; public static final String NOT_IMPLEMENTED = "Not implemented"; + /** + * Configurations for master executor services. + */ + public static final String MASTER_OPEN_REGION_THREADS = + "hbase.master.executor.openregion.threads"; + public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5; + + public static final String MASTER_CLOSE_REGION_THREADS = + "hbase.master.executor.closeregion.threads"; + public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5; + + public static final String MASTER_SERVER_OPERATIONS_THREADS = + "hbase.master.executor.serverops.threads"; + public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5; + + public static final String MASTER_META_SERVER_OPERATIONS_THREADS = + "hbase.master.executor.meta.serverops.threads"; + public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5; + + public static final String MASTER_LOG_REPLAY_OPS_THREADS = + "hbase.master.executor.logreplayops.threads"; + public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + + public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS = + "hbase.master.executor.snapshot.threads"; + public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 922deb8769d..85d6ccbc0ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -200,13 +200,13 @@ public enum EventType { * C_M_SNAPSHOT_TABLE
* Client asking Master to snapshot an offline table. */ - C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS), + C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS), /** * Messages originating from Client to Master.
* C_M_RESTORE_SNAPSHOT
* Client asking Master to restore a snapshot. */ - C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS), + C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS), // Updates from master to ZK. This is done by the master and there is // nothing to process by either Master or RS @@ -308,11 +308,6 @@ public enum EventType { throw new IllegalArgumentException("Unknown code " + code); } - public boolean isOnlineSchemaChangeSupported() { - return this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_TABLE); - } - ExecutorType getExecutorServiceType() { return this.executor; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 596385d0caa..66baccb95fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -34,6 +34,7 @@ public enum ExecutorType { MASTER_RS_SHUTDOWN (5), MASTER_META_SERVER_OPERATIONS (6), M_LOG_REPLAY_OPS (7), + MASTER_SNAPSHOT_OPERATIONS (8), // RegionServer executor services RS_OPEN_REGION (20), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 5a7eadb5f10..d35f4677f02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1335,18 +1335,23 @@ public class HMaster extends HRegionServer implements MasterServices { * as OOMEs; it should be lightly loaded. See what HRegionServer does if * need to install an unexpected exception handler. */ - private void startServiceThreads() throws IOException{ - // Start the executor service pools - this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, - conf.getInt("hbase.master.executor.openregion.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, - conf.getInt("hbase.master.executor.closeregion.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.meta.serverops.threads", 5)); - this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, - conf.getInt("hbase.master.executor.logreplayops.threads", 10)); + private void startServiceThreads() throws IOException { + // Start the executor service pools + this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt( + HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt( + HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, + conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS, + HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, + conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS, + HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt( + HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, + conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS, + HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index f6e9409a515..205c6c6f090 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.io.StringWriter; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService.Executor; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -219,5 +221,36 @@ public class TestExecutorService { executorService.shutdown(); } + @Test + public void testSnapshotHandlers() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + final Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(conf); + + ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); + executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1); + + CountDownLatch latch = new CountDownLatch(1); + executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) { + @Override + public void process() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) + .getThreadPoolExecutor().getActiveCount(); + Assert.assertEquals(activeCount, 1); + latch.countDown(); + Waiter.waitFor(conf, 3000, () -> { + int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) + .getThreadPoolExecutor().getActiveCount(); + return count == 0; + }); + } }