HBASE-26080 Implement a new mini cluster class for end users (#3470)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-07-13 15:14:13 +08:00 committed by GitHub
parent 1e763d521f
commit d1815445fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 707 additions and 0 deletions

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testing;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A mini hbase cluster used for testing.
* <p/>
* It will also start the necessary zookeeper cluster and dfs cluster. But we will not provide
* methods for controlling the zookeeper cluster and dfs cluster, as end users do not need to test
* the HBase behavior when these systems are broken.
* <p/>
* The implementation is not required to be thread safe, so do not call different methods
* concurrently.
*/
@InterfaceAudience.Public
public interface TestingHBaseCluster {
/**
* Get configuration of this cluster.
* <p/>
* You could use the returned {@link Configuration} to create
* {@link org.apache.hadoop.hbase.client.Connection} for accessing the testing cluster.
*/
Configuration getConf();
/**
* Start a new master with localhost and random port.
*/
void startMaster() throws Exception;
/**
* Start a new master bind on the given host and port.
*/
void startMaster(String hostname, int port) throws Exception;
/**
* Stop the given master.
* <p/>
* Wait on the returned {@link CompletableFuture} to wait on the master quit. The differences
* comparing to {@link org.apache.hadoop.hbase.client.Admin#stopMaster()} is that first, we could
* also stop backup masters here, second, this method does not always fail since we do not use rpc
* to stop the master.
*/
CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception;
/**
* Start a new region server with localhost and random port.
*/
void startRegionServer() throws Exception;
/**
* Start a new region server bind on the given host and port.
*/
void startRegionServer(String hostname, int port) throws Exception;
/**
* Stop the given region server.
* <p/>
* Wait on the returned {@link CompletableFuture} to wait on the master quit. The difference
* comparing to {@link org.apache.hadoop.hbase.client.Admin#stopMaster()} is that this method does
* not always fail since we do not use rpc to stop the region server.
*/
CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception;
/**
* Stop the hbase cluster.
* <p/>
* You need to call {@link #start()} first before calling this method, otherwise an
* {@link IllegalStateException} will be thrown. If the hbase is not running because you have
* already stopped the cluster, an {@link IllegalStateException} will be thrown too.
*/
void stopHBaseCluster() throws Exception;
/**
* Start the hbase cluster.
* <p/>
* This is used to start the hbase cluster again after you call {@link #stopHBaseCluster()}. If
* the cluster is already running or you have not called {@link #start()} yet, an
* {@link IllegalStateException} will be thrown.
*/
void startHBaseCluster() throws Exception;
/**
* Return whether the hbase cluster is running.
*/
boolean isHBaseClusterRunning();
/**
* Start the whole mini cluster, including zookeeper cluster, dfs cluster and hbase cluster.
* <p/>
* You can only call this method once at the beginning, unless you have called {@link #stop()} to
* shutdown the cluster completely, and then you can call this method to start the whole cluster
* again. An {@link IllegalStateException} will be thrown if you call this method incorrectly.
*/
void start() throws Exception;
/**
* Return whether the cluster is running.
* <p/>
* Notice that, this only means you have called {@link #start()} and have not called
* {@link #stop()} yet. If you want to make sure the hbase cluster is running, use
* {@link #isHBaseClusterRunning()}.
*/
boolean isClusterRunning();
/**
* Stop the whole mini cluster, including zookeeper cluster, dfs cluster and hbase cluster.
* <p/>
* You can only call this method after calling {@link #start()}, otherwise an
* {@link IllegalStateException} will be thrown.
*/
void stop() throws Exception;
/**
* Create a {@link TestingHBaseCluster}. You need to call {@link #start()} of the returned
* {@link TestingHBaseCluster} to actually start the mini testing cluster.
*/
static TestingHBaseCluster create(TestingHBaseClusterOption option) {
return new TestingHBaseClusterImpl(option);
}
}

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testing;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.Private
class TestingHBaseClusterImpl implements TestingHBaseCluster {
private final HBaseTestingUtility util = new HBaseTestingUtility();
private final StartMiniClusterOption option;
private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(getClass().getSuperclass() + "-%d").setDaemon(true).build());
private boolean miniClusterRunning = false;
private boolean miniHBaseClusterRunning = false;
TestingHBaseClusterImpl(TestingHBaseClusterOption option) {
this.option = option.convert();
}
@Override
public Configuration getConf() {
return util.getConfiguration();
}
private int getRegionServerIndex(ServerName serverName) {
// we have a small number of region servers, this should be fine for now.
List<RegionServerThread> servers = util.getMiniHBaseCluster().getRegionServerThreads();
for (int i = 0; i < servers.size(); i++) {
if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
return i;
}
}
return -1;
}
private int getMasterIndex(ServerName serverName) {
List<MasterThread> masters = util.getMiniHBaseCluster().getMasterThreads();
for (int i = 0; i < masters.size(); i++) {
if (masters.get(i).getMaster().getServerName().equals(serverName)) {
return i;
}
}
return -1;
}
private void join(Thread thread, CompletableFuture<?> future) {
executor.execute(() -> {
try {
thread.join();
future.complete(null);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
});
}
@Override
public CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
int index = getMasterIndex(serverName);
if (index == -1) {
future.completeExceptionally(new IllegalArgumentException("Unknown master " + serverName));
}
join(util.getMiniHBaseCluster().stopMaster(index), future);
return future;
}
@Override
public CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
int index = getRegionServerIndex(serverName);
if (index == -1) {
future
.completeExceptionally(new IllegalArgumentException("Unknown region server " + serverName));
}
join(util.getMiniHBaseCluster().stopRegionServer(index), future);
return future;
}
@Override
public void stopHBaseCluster() throws Exception {
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
Preconditions.checkState(miniHBaseClusterRunning, "HBase cluster has already been started");
util.shutdownMiniHBaseCluster();
miniHBaseClusterRunning = false;
}
@Override
public void startHBaseCluster() throws Exception {
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
Preconditions.checkState(!miniHBaseClusterRunning, "HBase cluster has already been started");
util.startMiniHBaseCluster(option);
miniHBaseClusterRunning = true;
}
@Override
public void start() throws Exception {
Preconditions.checkState(!miniClusterRunning, "Cluster has already been started");
util.startMiniCluster(option);
miniClusterRunning = true;
miniHBaseClusterRunning = true;
}
@Override
public void stop() throws Exception {
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
util.shutdownMiniCluster();
miniClusterRunning = false;
miniHBaseClusterRunning = false;
}
@Override
public boolean isHBaseClusterRunning() {
return miniHBaseClusterRunning;
}
@Override
public boolean isClusterRunning() {
return miniClusterRunning;
}
@Override
public void startMaster() throws Exception {
util.getMiniHBaseCluster().startMaster();
}
@Override
public void startMaster(String hostname, int port) throws Exception {
util.getMiniHBaseCluster().startMaster(hostname, port);
}
@Override
public void startRegionServer() throws Exception {
util.getMiniHBaseCluster().startRegionServer();
}
@Override
public void startRegionServer(String hostname, int port) throws Exception {
util.getMiniHBaseCluster().startRegionServer(hostname, port);
}
}

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testing;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Options for starting up a mini testing cluster {@link TestingHBaseCluster} (including an hbase,
* dfs and zookeeper clusters) in test. The options include HDFS options to build mini dfs cluster,
* Zookeeper options to build mini zk cluster, and mostly HBase options to build mini hbase cluster.
* To create an object, use a {@link Builder}. Example usage:
*
* <pre>
* TestingHBaseClusterOption option = TestingHBaseClusterOption.builder().
* .numMasters(3).createWALDir(true).build();
* </pre>
*
* Default values can be found in {@link Builder}.
*/
@InterfaceAudience.Public
public final class TestingHBaseClusterOption {
/**
* Number of masters to start up. We'll start this many hbase masters.
*/
private final int numMasters;
/**
* Number of masters that always remain standby. These set of masters never transition to active
* even if an active master does not exist.
*/
private final int numAlwaysStandByMasters;
/**
* Number of region servers to start up. If this value is > 1, then make sure config
* "hbase.regionserver.info.port" is -1 (i.e. no ui per regionserver) otherwise bind errors.
*/
private final int numRegionServers;
/**
* Ports that RegionServer should use. Pass ports if you want to test cluster restart where for
* sure the regionservers come up on same address+port (but just with different startcode); by
* default mini hbase clusters choose new arbitrary ports on each cluster start.
*/
private final List<Integer> rsPorts;
/**
* Number of datanodes. Used to create mini DSF cluster. Surpassed by {@link #dataNodeHosts} size.
*/
private final int numDataNodes;
/**
* The hostnames of DataNodes to run on. This is useful if you want to run datanode on distinct
* hosts for things like HDFS block location verification. If you start MiniDFSCluster without
* host names, all instances of the datanodes will have the same host name.
*/
private final String[] dataNodeHosts;
/**
* Number of Zookeeper servers.
*/
private final int numZkServers;
/**
* Whether to create a new root or data directory path. If true, the newly created data directory
* will be configured as HBase rootdir. This will overwrite existing root directory config.
*/
private final boolean createRootDir;
/**
* Whether to create a new WAL directory. If true, the newly created directory will be configured
* as HBase wal.dir which is separate from HBase rootdir.
*/
private final boolean createWALDir;
/**
* Private constructor. Use {@link Builder#build()}.
*/
private TestingHBaseClusterOption(int numMasters, int numAlwaysStandByMasters,
int numRegionServers, List<Integer> rsPorts, int numDataNodes, String[] dataNodeHosts,
int numZkServers, boolean createRootDir, boolean createWALDir) {
this.numMasters = numMasters;
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
this.numRegionServers = numRegionServers;
this.rsPorts = rsPorts;
this.numDataNodes = numDataNodes;
this.dataNodeHosts = dataNodeHosts;
this.numZkServers = numZkServers;
this.createRootDir = createRootDir;
this.createWALDir = createWALDir;
}
public int getNumMasters() {
return numMasters;
}
public int getNumAlwaysStandByMasters() {
return numAlwaysStandByMasters;
}
public int getNumRegionServers() {
return numRegionServers;
}
public List<Integer> getRsPorts() {
return rsPorts;
}
public int getNumDataNodes() {
return numDataNodes;
}
public String[] getDataNodeHosts() {
return dataNodeHosts;
}
public int getNumZkServers() {
return numZkServers;
}
public boolean isCreateRootDir() {
return createRootDir;
}
public boolean isCreateWALDir() {
return createWALDir;
}
@Override
public String toString() {
return "StartMiniClusterOption{" + "numMasters=" + numMasters + ", numRegionServers=" +
numRegionServers + ", rsPorts=" + StringUtils.join(rsPorts) + ", numDataNodes=" +
numDataNodes + ", dataNodeHosts=" + Arrays.toString(dataNodeHosts) + ", numZkServers=" +
numZkServers + ", createRootDir=" + createRootDir + ", createWALDir=" + createWALDir + '}';
}
/**
* Convert to the internal option. Not for public use so package private.
*/
StartMiniClusterOption convert() {
return StartMiniClusterOption.builder().numMasters(numMasters)
.numAlwaysStandByMasters(numAlwaysStandByMasters).numRegionServers(numRegionServers)
.rsPorts(rsPorts).numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
.numZkServers(numZkServers).createRootDir(createRootDir).createWALDir(createWALDir).build();
}
/**
* Returns a new builder.
*/
public static Builder builder() {
return new Builder();
}
/**
* Builder pattern for creating an {@link TestingHBaseClusterOption}. The default values of its
* fields should be considered public and constant. Changing the default values may cause other
* tests fail.
*/
public static final class Builder {
private int numMasters = 1;
private int numAlwaysStandByMasters = 0;
private int numRegionServers = 1;
private List<Integer> rsPorts = null;
private int numDataNodes = 1;
private String[] dataNodeHosts = null;
private int numZkServers = 1;
private boolean createRootDir = false;
private boolean createWALDir = false;
private Builder() {
}
public TestingHBaseClusterOption build() {
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length;
}
return new TestingHBaseClusterOption(numMasters, numAlwaysStandByMasters, numRegionServers,
rsPorts, numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir);
}
public Builder numMasters(int numMasters) {
this.numMasters = numMasters;
return this;
}
public Builder numAlwaysStandByMasters(int numAlwaysStandByMasters) {
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
return this;
}
public Builder numRegionServers(int numRegionServers) {
this.numRegionServers = numRegionServers;
return this;
}
public Builder rsPorts(List<Integer> rsPorts) {
this.rsPorts = rsPorts;
return this;
}
public Builder numDataNodes(int numDataNodes) {
this.numDataNodes = numDataNodes;
return this;
}
public Builder dataNodeHosts(String[] dataNodeHosts) {
this.dataNodeHosts = dataNodeHosts;
return this;
}
public Builder numZkServers(int numZkServers) {
this.numZkServers = numZkServers;
return this;
}
public Builder createRootDir(boolean createRootDir) {
this.createRootDir = createRootDir;
return this;
}
public Builder createWALDir(boolean createWALDir) {
this.createWALDir = createWALDir;
return this;
}
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.DNS.ServerType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ MiscTests.class, LargeTests.class })
public class TestTestingHBaseCluster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTestingHBaseCluster.class);
private static TestingHBaseCluster CLUSTER;
private Connection conn;
private Admin admin;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
CLUSTER = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().numMasters(2)
.numRegionServers(3).numDataNodes(3).build());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (CLUSTER.isClusterRunning()) {
CLUSTER.stop();
}
}
@Before
public void setUp() throws Exception {
if (!CLUSTER.isClusterRunning()) {
CLUSTER.start();
}
if (!CLUSTER.isHBaseClusterRunning()) {
CLUSTER.startHBaseCluster();
}
conn = ConnectionFactory.createConnection(CLUSTER.getConf());
admin = conn.getAdmin();
}
@After
public void tearDown() throws Exception {
Closeables.close(admin, true);
Closeables.close(conn, true);
if (CLUSTER.isHBaseClusterRunning()) {
CLUSTER.stopHBaseCluster();
}
}
@Test
public void testStartStop() throws Exception {
assertTrue(CLUSTER.isClusterRunning());
assertTrue(CLUSTER.isHBaseClusterRunning());
assertThrows(IllegalStateException.class, () -> CLUSTER.start());
CLUSTER.stop();
assertFalse(CLUSTER.isClusterRunning());
assertFalse(CLUSTER.isHBaseClusterRunning());
assertThrows(IllegalStateException.class, () -> CLUSTER.stop());
}
@Test
public void testStartStopHBaseCluster() throws Exception {
assertTrue(CLUSTER.isHBaseClusterRunning());
assertThrows(IllegalStateException.class, () -> CLUSTER.startHBaseCluster());
CLUSTER.stopHBaseCluster();
assertTrue(CLUSTER.isClusterRunning());
assertFalse(CLUSTER.isHBaseClusterRunning());
assertThrows(IllegalStateException.class, () -> CLUSTER.stopHBaseCluster());
}
@Test
public void testStartStopMaster() throws Exception {
ServerName master = admin.getMaster();
CLUSTER.stopMaster(master).join();
// wait until the backup master becomes active master.
Waiter.waitFor(CLUSTER.getConf(), 30000, () -> {
try {
return admin.getMaster() != null;
} catch (Exception e) {
// ignore
return false;
}
});
// should have no backup master
assertTrue(admin.getBackupMasters().isEmpty());
CLUSTER.startMaster();
Waiter.waitFor(CLUSTER.getConf(), 30000, () -> !admin.getBackupMasters().isEmpty());
CLUSTER.startMaster(DNS.getHostname(CLUSTER.getConf(), ServerType.MASTER), 0);
Waiter.waitFor(CLUSTER.getConf(), 30000, () -> admin.getBackupMasters().size() == 2);
}
@Test
public void testStartStopRegionServer() throws Exception {
Collection<ServerName> regionServers = admin.getRegionServers();
assertEquals(3, regionServers.size());
CLUSTER.stopRegionServer(Iterables.get(regionServers, 0)).join();
Waiter.waitFor(CLUSTER.getConf(), 30000, () -> admin.getRegionServers().size() == 2);
CLUSTER.startRegionServer();
Waiter.waitFor(CLUSTER.getConf(), 30000, () -> admin.getRegionServers().size() == 3);
CLUSTER.startRegionServer(DNS.getHostname(CLUSTER.getConf(), ServerType.REGIONSERVER), 0);
Waiter.waitFor(CLUSTER.getConf(), 30000, () -> admin.getRegionServers().size() == 4);
}
}