From be2de08c12e18c0e3b979483b3f33fdc3a972af5 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 21 Aug 2019 20:53:27 +0800 Subject: [PATCH] HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (addendum - use the old config key) --- .../org/apache/hadoop/hbase/HConstants.java | 4 - .../apache/hadoop/hbase/master/HMaster.java | 5 +- .../snapshot/EnabledTableSnapshotHandler.java | 7 +- .../master/snapshot/SnapshotManager.java | 4 +- ...TestConcurrentFlushSnapshotFromClient.java | 44 +++++++ .../snapshot/TestFlushSnapshotFromClient.java | 111 +----------------- 6 files changed, 53 insertions(+), 122 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestConcurrentFlushSnapshotFromClient.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e30856c93aa..723d70b1b03 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1363,10 +1363,6 @@ public final class HConstants { "hbase.master.executor.logreplayops.threads"; public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; - public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS = - "hbase.master.executor.snapshot.threads"; - public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3; - private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 07d4ff8f23f..f319ee4e743 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1226,9 +1226,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT)); this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt( HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT)); - this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, - conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS, - HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT)); + this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt( + SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java index f424c38e6ae..899066d2154 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java @@ -65,12 +65,11 @@ public class EnabledTableSnapshotHandler extends TakeSnapshotHandler { // enforce a snapshot time constraints, but lets us be potentially a bit more robust. /** - * This method kicks off a snapshot procedure. Other than that it hangs around for various - * phases to complete. + * This method kicks off a snapshot procedure. Other than that it hangs around for various phases + * to complete. */ @Override - protected void snapshotRegions(List> regions) - throws HBaseSnapshotException, IOException { + protected void snapshotRegions(List> regions) throws IOException { Set regionServers = new HashSet(regions.size()); for (Pair region : regions) { if (region != null && region.getFirst() != null && region.getSecond() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index e0627acb9d9..fff0ec993d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -137,10 +137,10 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot"; /** Conf key for # of threads used by the SnapshotManager thread pool */ - private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads"; + public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads"; /** number of current operations running on the master */ - private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1; + public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1; private boolean stopped; private MasterServices master; // Needed by TableEventHandlers diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestConcurrentFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestConcurrentFlushSnapshotFromClient.java new file mode 100644 index 00000000000..80943cddc44 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestConcurrentFlushSnapshotFromClient.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.snapshot; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ClientTests.class, LargeTests.class }) +public class TestConcurrentFlushSnapshotFromClient extends TestFlushSnapshotFromClient { + private static final Logger LOG = LoggerFactory.getLogger(TestFlushSnapshotFromClient.class); + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(3); + } + + protected static void setupConf(Configuration conf) { + TestFlushSnapshotFromClient.setupConf(conf); + UTIL.getConfiguration().setInt(SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, 3); + LOG.info("Config the {} to be 3", SnapshotManager.SNAPSHOT_POOL_THREADS_KEY); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java index 667c0156370..6a6bad8728a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.snapshot; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -27,7 +26,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,7 +63,7 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) public class TestFlushSnapshotFromClient { private static final Log LOG = LogFactory.getLog(TestFlushSnapshotFromClient.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final int NUM_RS = 2; private static final byte[] TEST_FAM = Bytes.toBytes("fam"); private static final TableName TABLE_NAME = TableName.valueOf("test"); @@ -86,7 +84,7 @@ public class TestFlushSnapshotFromClient { UTIL.startMiniCluster(NUM_RS); } - private static void setupConf(Configuration conf) { + protected static void setupConf(Configuration conf) { // disable the ui conf.setInt("hbase.regionsever.info.port", -1); // change the flush size to a small amount, regulating number of store files @@ -400,111 +398,6 @@ public class TestFlushSnapshotFromClient { snapshotName, rootDir, fs, true); } - /** - * Demonstrate that we reject snapshot requests if there is a snapshot already running on the - * same table currently running and that concurrent snapshots on different tables can both - * succeed concurretly. - */ - @Test(timeout=300000) - public void testConcurrentSnapshottingAttempts() throws IOException, InterruptedException { - final TableName TABLE2_NAME = TableName.valueOf(TABLE_NAME + "2"); - - int ssNum = 20; - Admin admin = UTIL.getHBaseAdmin(); - // make sure we don't fail on listing snapshots - SnapshotTestingUtils.assertNoSnapshots(admin); - // create second testing table - SnapshotTestingUtils.createTable(UTIL, TABLE2_NAME, TEST_FAM); - // load the table so we have some data - SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); - SnapshotTestingUtils.loadData(UTIL, TABLE2_NAME, DEFAULT_NUM_ROWS, TEST_FAM); - - final CountDownLatch toBeSubmitted = new CountDownLatch(ssNum); - // We'll have one of these per thread - class SSRunnable implements Runnable { - SnapshotDescription ss; - SSRunnable(SnapshotDescription ss) { - this.ss = ss; - } - - @Override - public void run() { - try { - Admin admin = UTIL.getHBaseAdmin(); - LOG.info("Submitting snapshot request: " + ClientSnapshotDescriptionUtils.toString(ss)); - admin.takeSnapshotAsync(ss); - } catch (Exception e) { - LOG.info("Exception during snapshot request: " + ClientSnapshotDescriptionUtils.toString( - ss) - + ". This is ok, we expect some", e); - } - LOG.info("Submitted snapshot request: " + ClientSnapshotDescriptionUtils.toString(ss)); - toBeSubmitted.countDown(); - } - }; - - // build descriptions - SnapshotDescription[] descs = new SnapshotDescription[ssNum]; - for (int i = 0; i < ssNum; i++) { - SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); - builder.setTable(((i % 2) == 0 ? TABLE_NAME : TABLE2_NAME).getNameAsString()); - builder.setName("ss"+i); - builder.setType(SnapshotDescription.Type.FLUSH); - descs[i] = builder.build(); - } - - // kick each off its own thread - for (int i=0 ; i < ssNum; i++) { - new Thread(new SSRunnable(descs[i])).start(); - } - - // wait until all have been submitted - toBeSubmitted.await(); - - // loop until all are done. - while (true) { - int doneCount = 0; - for (SnapshotDescription ss : descs) { - try { - if (admin.isSnapshotFinished(ss)) { - doneCount++; - } - } catch (Exception e) { - LOG.warn("Got an exception when checking for snapshot " + ss.getName(), e); - doneCount++; - } - } - if (doneCount == descs.length) { - break; - } - Thread.sleep(100); - } - - // dump for debugging - UTIL.getHBaseCluster().getMaster().getMasterFileSystem().logFileSystemState(LOG); - - List taken = admin.listSnapshots(); - int takenSize = taken.size(); - LOG.info("Taken " + takenSize + " snapshots: " + taken); - assertTrue("We expect at least 1 request to be rejected because of we concurrently" + - " issued many requests", takenSize < ssNum && takenSize > 0); - - // Verify that there's at least one snapshot per table - int t1SnapshotsCount = 0; - int t2SnapshotsCount = 0; - for (SnapshotDescription ss : taken) { - if (TableName.valueOf(ss.getTable()).equals(TABLE_NAME)) { - t1SnapshotsCount++; - } else if (TableName.valueOf(ss.getTable()).equals(TABLE2_NAME)) { - t2SnapshotsCount++; - } - } - assertTrue("We expect at least 1 snapshot of table1 ", t1SnapshotsCount > 0); - assertTrue("We expect at least 1 snapshot of table2 ", t2SnapshotsCount > 0); - - UTIL.deleteTable(TABLE2_NAME); - } - private void waitRegionsAfterMerge(final long numRegionsAfterMerge) throws IOException, InterruptedException { Admin admin = UTIL.getHBaseAdmin();