diff --git a/bin/hbase b/bin/hbase index 02944f9ee69..05f1670ac5d 100755 --- a/bin/hbase +++ b/bin/hbase @@ -349,7 +349,7 @@ fi # Exec unless HBASE_NOEXEC is set. if [ "${HBASE_NOEXEC}" != "" ]; then - "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" + "$JAVA" -Dproc_$COMMAND {-XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" else - exec "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" + exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" fi diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml index f9a123570a7..e9e477fe488 100644 --- a/hbase-it/pom.xml +++ b/hbase-it/pom.xml @@ -38,6 +38,12 @@ + + + + ../hbase-server/src/test/resources + + @@ -52,7 +58,7 @@ - + ${integrationtest.include} @@ -66,16 +72,20 @@ ${env.DYLD_LIBRARY_PATH}:${project.build.directory}/nativelib 4 + false + false integration-test + integration-test integration-test verify + verify verify @@ -93,6 +103,12 @@ false always + + 1800 + -enableassertions -Xmx1900m + -Djava.security.egd=file:/dev/./urandom + false @@ -100,7 +116,7 @@ - + org.apache.hbase hbase-server diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java new file mode 100644 index 00000000000..5a5b010959b --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configured; + + +/** + * ClusterManager is an api to manage servers in a distributed environment. It provides services + * for starting / stopping / killing Hadoop/HBase daemons. Concrete implementations provide actual + * functionality for carrying out deployment-specific tasks. + */ +@InterfaceAudience.Private +public abstract class ClusterManager extends Configured { + protected static final Log LOG = LogFactory.getLog(ClusterManager.class); + + private static final String SIGKILL = "SIGKILL"; + private static final String SIGSTOP = "SIGSTOP"; + private static final String SIGCONT = "SIGCONT"; + + public ClusterManager() { + } + + /** + * Type of the service daemon + */ + public static enum ServiceType { + HADOOP_NAMENODE("namenode"), + HADOOP_DATANODE("datanode"), + HADOOP_JOBTRACKER("jobtracker"), + HADOOP_TASKTRACKER("tasktracker"), + HBASE_MASTER("master"), + HBASE_REGIONSERVER("regionserver"); + + private String name; + + ServiceType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return getName(); + } + } + + /** + * Start the service on the given host + */ + public abstract void start(ServiceType service, String hostname) throws IOException; + + /** + * Stop the service on the given host + */ + public abstract void stop(ServiceType service, String hostname) throws IOException; + + /** + * Restart the service on the given host + */ + public abstract void restart(ServiceType service, String hostname) throws IOException; + + /** + * Send the given posix signal to the service + */ + public abstract void signal(ServiceType service, String signal, + String hostname) throws IOException; + + /** + * Kill the service running on given host + */ + public void kill(ServiceType service, String hostname) throws IOException { + signal(service, SIGKILL, hostname); + } + + /** + * Suspend the service running on given host + */ + public void suspend(ServiceType service, String hostname) throws IOException { + signal(service, SIGSTOP, hostname); + } + + /** + * Resume the services running on given hosts + */ + public void resume(ServiceType service, String hostname) throws IOException { + signal(service, SIGCONT, hostname); + } + + /** + * Returns whether the service is running on the remote host. This only checks whether the + * service still has a pid. + */ + public abstract boolean isRunning(ServiceType service, String hostname) throws IOException; + + /* TODO: further API ideas: + * + * //return services running on host: + * ServiceType[] getRunningServicesOnHost(String hostname); + * + * //return which services can be run on host (for example, to query whether hmaster can run on this host) + * ServiceType[] getRunnableServicesOnHost(String hostname); + * + * //return which hosts can run this service + * String[] getRunnableHostsForService(ServiceType service); + */ + +} \ No newline at end of file diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java new file mode 100644 index 00000000000..0095005aefb --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterManager.ServiceType; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.ClientProtocol; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.collect.Sets; + +/** + * Manages the interactions with an already deployed distributed cluster (as opposed to + * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests. + */ +@InterfaceAudience.Private +public class DistributedHBaseCluster extends HBaseCluster { + + private HBaseAdmin admin; + + private ClusterManager clusterManager; + + public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) + throws IOException { + super(conf); + this.clusterManager = clusterManager; + this.admin = new HBaseAdmin(conf); + this.initialClusterStatus = getClusterStatus(); + } + + public void setClusterManager(ClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + public ClusterManager getClusterManager() { + return clusterManager; + } + + /** + * Returns a ClusterStatus for this HBase cluster + * @throws IOException + */ + @Override + public ClusterStatus getClusterStatus() throws IOException { + return admin.getClusterStatus(); + } + + @Override + public ClusterStatus getInitialClusterStatus() throws IOException { + return initialClusterStatus; + } + + @Override + public void close() throws IOException { + if (this.admin != null) { + admin.close(); + } + } + + @Override + public AdminProtocol getAdminProtocol(ServerName serverName) throws IOException { + return admin.getConnection().getAdmin(serverName.getHostname(), serverName.getPort()); + } + + @Override + public ClientProtocol getClientProtocol(ServerName serverName) throws IOException { + return admin.getConnection().getClient(serverName.getHostname(), serverName.getPort()); + } + + @Override + public void startRegionServer(String hostname) throws IOException { + LOG.info("Starting RS on: " + hostname); + clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname); + } + + @Override + public void killRegionServer(ServerName serverName) throws IOException { + LOG.info("Aborting RS: " + serverName.getServerName()); + clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname()); + } + + @Override + public void stopRegionServer(ServerName serverName) throws IOException { + LOG.info("Stopping RS: " + serverName.getServerName()); + clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname()); + } + + @Override + public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { + waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); + } + + private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) + throws IOException { + LOG.info("Waiting service:" + service + " to stop: " + serverName.getServerName()); + long start = System.currentTimeMillis(); + + while ((System.currentTimeMillis() - start) < timeout) { + if (!clusterManager.isRunning(service, serverName.getHostname())) { + return; + } + Threads.sleep(1000); + } + throw new IOException("did timeout waiting for service to stop:" + serverName); + } + + @Override + public MasterAdminProtocol getMasterAdmin() throws IOException { + HConnection conn = HConnectionManager.getConnection(conf); + return conn.getMasterAdmin(); + } + + @Override + public MasterMonitorProtocol getMasterMonitor() throws IOException { + HConnection conn = HConnectionManager.getConnection(conf); + return conn.getMasterMonitor(); + } + + @Override + public void startMaster(String hostname) throws IOException { + LOG.info("Starting Master on: " + hostname); + clusterManager.start(ServiceType.HBASE_MASTER, hostname); + } + + @Override + public void killMaster(ServerName serverName) throws IOException { + LOG.info("Aborting Master: " + serverName.getServerName()); + clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname()); + } + + @Override + public void stopMaster(ServerName serverName) throws IOException { + LOG.info("Stopping Master: " + serverName.getServerName()); + clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname()); + } + + @Override + public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { + waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); + } + + @Override + public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timeout) { + try { + getMasterAdmin(); + return true; + } catch (MasterNotRunningException m) { + LOG.warn("Master not started yet " + m); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Failed to connect to ZK " + e); + } + Threads.sleep(1000); + } + return false; + } + + @Override + public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { + HConnection connection = admin.getConnection(); + HRegionLocation regionLoc = connection.locateRegion(regionName); + if (regionLoc == null) { + return null; + } + + AdminProtocol client = connection.getAdmin(regionLoc.getHostname(), regionLoc.getPort()); + ServerInfo info = ProtobufUtil.getServerInfo(client); + return ProtobufUtil.toServerName(info.getServerName()); + } + + @Override + public void waitUntilShutDown() { + //Simply wait for a few seconds for now (after issuing serverManager.kill + throw new RuntimeException("Not implemented yet"); + } + + @Override + public void shutdown() throws IOException { + //not sure we want this + throw new RuntimeException("Not implemented yet"); + } + + @Override + public boolean isDistributedCluster() { + return true; + } + + @Override + public void restoreClusterStatus(ClusterStatus initial) throws IOException { + //TODO: caution: not tested throughly + ClusterStatus current = getClusterStatus(); + + //restore masters + + //check whether current master has changed + if (!ServerName.isSameHostnameAndPort(initial.getMaster(), current.getMaster())) { + //master has changed, we would like to undo this. + //1. Kill the current backups + //2. Stop current master + //3. Start a master at the initial hostname (if not already running as backup) + //4. Start backup masters + boolean foundOldMaster = false; + for (ServerName currentBackup : current.getBackupMasters()) { + if (!ServerName.isSameHostnameAndPort(currentBackup, initial.getMaster())) { + stopMaster(currentBackup); + } else { + foundOldMaster = true; + } + } + stopMaster(current.getMaster()); + if (foundOldMaster) { //if initial master is not running as a backup + startMaster(initial.getMaster().getHostname()); + } + waitForActiveAndReadyMaster(); //wait so that active master takes over + + //start backup masters + for (ServerName backup : initial.getBackupMasters()) { + //these are not started in backup mode, but we should already have an active master + startMaster(backup.getHostname()); + } + } else { + //current master has not changed, match up backup masters + HashMap initialBackups = new HashMap(); + HashMap currentBackups = new HashMap(); + + for (ServerName server : initial.getBackupMasters()) { + initialBackups.put(server.getHostname(), server); + } + for (ServerName server : current.getBackupMasters()) { + currentBackups.put(server.getHostname(), server); + } + + for (String hostname : Sets.difference(initialBackups.keySet(), currentBackups.keySet())) { + startMaster(hostname); + } + + for (String hostname : Sets.difference(currentBackups.keySet(), initialBackups.keySet())) { + stopMaster(currentBackups.get(hostname)); + } + } + + //restore region servers + HashMap initialServers = new HashMap(); + HashMap currentServers = new HashMap(); + + for (ServerName server : initial.getServers()) { + initialServers.put(server.getHostname(), server); + } + for (ServerName server : current.getServers()) { + currentServers.put(server.getHostname(), server); + } + + for (String hostname : Sets.difference(initialServers.keySet(), currentServers.keySet())) { + startRegionServer(hostname); + } + + for (String hostname : Sets.difference(currentServers.keySet(), initialServers.keySet())) { + stopRegionServer(currentServers.get(hostname)); + } + } +} \ No newline at end of file diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java new file mode 100644 index 00000000000..7803d4141cd --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Shell; + +/** + * A default cluster manager for HBase. Uses SSH, and hbase shell scripts + * to manage the cluster. Assumes Unix-like commands are available like 'ps', + * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop + * servers on the remote machines (for example, the test user could be the same user as the + * user the daemon isrunning as) + */ +@InterfaceAudience.Private +public class HBaseClusterManager extends ClusterManager { + + /** + * Executes commands over SSH + */ + static class RemoteShell extends Shell.ShellCommandExecutor { + + private String hostname; + + private String sshCmd = "/usr/bin/ssh"; + private String sshOptions = System.getenv("HBASE_SSH_OPTS"); //from conf/hbase-env.sh + + public RemoteShell(String hostname, String[] execString, File dir, Map env, + long timeout) { + super(execString, dir, env, timeout); + this.hostname = hostname; + } + + public RemoteShell(String hostname, String[] execString, File dir, Map env) { + super(execString, dir, env); + this.hostname = hostname; + } + + public RemoteShell(String hostname, String[] execString, File dir) { + super(execString, dir); + this.hostname = hostname; + } + + public RemoteShell(String hostname, String[] execString) { + super(execString); + this.hostname = hostname; + } + + @Override + protected String[] getExecString() { + return new String[] { + "bash", "-c", + StringUtils.join(new String[] { sshCmd, + sshOptions == null ? "" : sshOptions, + hostname, + "\"" + StringUtils.join(super.getExecString(), " ") + "\"" + }, " ")}; + } + + @Override + public void execute() throws IOException { + super.execute(); + } + + public void setSshCmd(String sshCmd) { + this.sshCmd = sshCmd; + } + + public void setSshOptions(String sshOptions) { + this.sshOptions = sshOptions; + } + + public String getSshCmd() { + return sshCmd; + } + + public String getSshOptions() { + return sshOptions; + } + } + + /** + * Provides command strings for services to be executed by Shell. CommandProviders are + * pluggable, and different deployments(windows, bigtop, etc) can be managed by + * plugging-in custom CommandProvider's or ClusterManager's. + */ + static abstract class CommandProvider { + + enum Operation { + START, STOP, RESTART + } + + public abstract String getCommand(ServiceType service, Operation op); + + public String isRunningCommand(ServiceType service) { + return findPidCommand(service); + } + + protected String findPidCommand(ServiceType service) { + return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", + service); + } + + public String signalCommand(ServiceType service, String signal) { + return String.format("%s | xargs kill -s %s", findPidCommand(service), signal); + } + } + + /** + * CommandProvider to manage the service using bin/hbase-* scripts + */ + static class HBaseShellCommandProvider extends CommandProvider { + private String getHBaseHome() { + return System.getenv("HBASE_HOME"); + } + + private String getConfig() { + String confDir = System.getenv("HBASE_CONF_DIR"); + if (confDir != null) { + return String.format("--config %s", confDir); + } + return ""; + } + + @Override + public String getCommand(ServiceType service, Operation op) { + return String.format("%s/bin/hbase-daemon.sh %s %s %s", getHBaseHome(), getConfig(), + op.toString().toLowerCase(), service); + } + } + + public HBaseClusterManager() { + super(); + } + + protected CommandProvider getCommandProvider(ServiceType service) { + //TODO: make it pluggable, or auto-detect the best command provider, should work with + //hadoop daemons as well + return new HBaseShellCommandProvider(); + } + + /** + * Execute the given command on the host using SSH + * @return pair of exit code and command output + * @throws IOException if something goes wrong. + */ + private Pair exec(String hostname, String... cmd) throws IOException { + LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname); + + RemoteShell shell = new RemoteShell(hostname, cmd); + shell.execute(); + + LOG.info("Executed remote command, exit code:" + shell.getExitCode() + + " , output:" + shell.getOutput()); + + return new Pair(shell.getExitCode(), shell.getOutput()); + } + + private void exec(String hostname, ServiceType service, Operation op) throws IOException { + exec(hostname, getCommandProvider(service).getCommand(service, op)); + } + + @Override + public void start(ServiceType service, String hostname) throws IOException { + exec(hostname, service, Operation.START); + } + + @Override + public void stop(ServiceType service, String hostname) throws IOException { + exec(hostname, service, Operation.STOP); + } + + @Override + public void restart(ServiceType service, String hostname) throws IOException { + exec(hostname, service, Operation.RESTART); + } + + @Override + public void signal(ServiceType service, String signal, String hostname) throws IOException { + exec(hostname, getCommandProvider(service).signalCommand(service, signal)); + } + + @Override + public boolean isRunning(ServiceType service, String hostname) throws IOException { + String ret = exec(hostname, getCommandProvider(service).isRunningCommand(service)) + .getSecond(); + return ret.length() > 0; + } + +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java new file mode 100644 index 00000000000..72f76b0dcfe --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChaosMonkey; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * A system test which does large data ingestion and verify using {@link LoadTestTool}, + * while killing the region servers and the master(s) randomly. You can configure how long + * should the load test run by using "hbase.IntegrationTestDataIngestWithChaosMonkey.runtime" + * configuration parameter. + */ +@Category(IntegrationTests.class) +public class IntegrationTestDataIngestWithChaosMonkey { + + private static final String TABLE_NAME = "TestDataIngestWithChaosMonkey"; + private static final int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster + + /** A soft limit on how long we should run */ + private static final String RUN_TIME_KEY = "hbase.IntegrationTestDataIngestWithChaosMonkey.runtime"; + + //run for 5 min by default + private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000; + + private static final Log LOG = LogFactory.getLog(IntegrationTestDataIngestWithChaosMonkey.class); + private IntegrationTestingUtility util; + private HBaseCluster cluster; + private ChaosMonkey monkey; + + @Before + public void setUp() throws Exception { + util = new IntegrationTestingUtility(); + + util.initializeCluster(NUM_SLAVES_BASE); + + cluster = util.getHBaseClusterInterface(); + deleteTableIfNecessary(); + + monkey = new ChaosMonkey(util, ChaosMonkey.EVERY_MINUTE_RANDOM_ACTION_POLICY); + monkey.start(); + } + + @After + public void tearDown() throws Exception { + monkey.stop("test has finished, that's why"); + monkey.waitForStop(); + util.restoreCluster(); + } + + private void deleteTableIfNecessary() throws IOException { + if (util.getHBaseAdmin().tableExists(TABLE_NAME)) { + util.deleteTable(Bytes.toBytes(TABLE_NAME)); + } + } + + @Test + public void testDataIngest() throws Exception { + LOG.info("Running testDataIngest"); + LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize()); + + LoadTestTool loadTool = new LoadTestTool(); + loadTool.setConf(util.getConfiguration()); + + long start = System.currentTimeMillis(); + long runtime = util.getConfiguration().getLong(RUN_TIME_KEY, DEFAULT_RUN_TIME); + long startKey = 0; + + long numKeys = estimateDataSize(); + while (System.currentTimeMillis() - start < 0.9 * runtime) { + LOG.info("Intended run time: " + (runtime/60000) + " min, left:" + + ((runtime - (System.currentTimeMillis() - start))/60000) + " min"); + + int ret = loadTool.run(new String[] { + "-tn", TABLE_NAME, + "-write", "10:100:20", + "-start_key", String.valueOf(startKey), + "-num_keys", String.valueOf(numKeys) + }); + + //assert that load was successful + Assert.assertEquals(0, ret); + + ret = loadTool.run(new String[] { + "-tn", TABLE_NAME, + "-read", "100:20", + "-start_key", String.valueOf(startKey), + "-num_keys", String.valueOf(numKeys) + }); + + //assert that verify was successful + Assert.assertEquals(0, ret); + startKey += numKeys; + } + } + + /** Estimates a data size based on the cluster size */ + protected long estimateDataSize() throws IOException { + //base is a 4 slave node cluster. + ClusterStatus status = cluster.getClusterStatus(); + int numRegionServers = status.getServersSize(); + int multiplication = Math.max(1, numRegionServers / NUM_SLAVES_BASE); + + return 10000 * multiplication; + } +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java new file mode 100644 index 00000000000..d5a6feeaf12 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Facility for integration/system tests. This extends {@link HBaseTestingUtility} + * and adds-in the functionality needed by integration and system tests. This class understands + * distributed and pseudo-distributed/local cluster deployments, and abstracts those from the tests + * in this module. + *

+ * IntegrationTestingUtility is constructed and used by the integration tests, but the tests + * themselves should not assume a particular deployment. They can rely on the methods in this + * class and HBaseCluster. Before the testing begins, the test should initialize the cluster by + * calling {@link #initializeCluster(int)}. + *

+ * The cluster that is used defaults to a mini cluster, but it can be forced to use a distributed + * cluster by calling {@link #setUseDistributedCluster(Configuration)}. This method is invoked by + * test drivers (maven, IntegrationTestsDriver, etc) before initializing the cluster + * via {@link #initializeCluster(int)}. Individual tests should not directly call + * {@link #setUseDistributedCluster(Configuration)}. + */ +public class IntegrationTestingUtility extends HBaseTestingUtility { + + public IntegrationTestingUtility() { + this(HBaseConfiguration.create()); + } + + public IntegrationTestingUtility(Configuration conf) { + super(conf); + } + + /** + * Configuration that controls whether this utility assumes a running/deployed cluster. + * This is different than "hbase.cluster.distributed" since that parameter indicates whether the + * cluster is in an actual distributed environment, while this shows that there is a + * deployed (distributed or pseudo-distributed) cluster running, and we do not need to + * start a mini-cluster for tests. + */ + public static final String IS_DISTRIBUTED_CLUSTER = "hbase.test.cluster.distributed"; + + /** + * Initializes the state of the cluster. It starts a new in-process mini cluster, OR + * if we are given an already deployed distributed cluster it initializes the state. + * @param numSlaves Number of slaves to start up if we are booting a mini cluster. Otherwise + * we check whether this many nodes are available and throw an exception if not. + */ + public void initializeCluster(int numSlaves) throws Exception { + if (isDistributedCluster()) { + createDistributedHBaseCluster(); + checkNodeCount(numSlaves); + } else { + startMiniCluster(numSlaves); + } + } + + /** + * Checks whether we have more than numSlaves nodes. Throws an + * exception otherwise. + */ + public void checkNodeCount(int numSlaves) throws Exception { + HBaseCluster cluster = getHBaseClusterInterface(); + if (cluster.getClusterStatus().getServers().size() < numSlaves) { + throw new Exception("Cluster does not have enough nodes:" + numSlaves); + } + } + + /** + * Restores the cluster to the initial state if it is a distributed cluster, otherwise, shutdowns the + * mini cluster. + */ + public void restoreCluster() throws IOException { + if (isDistributedCluster()) { + getHBaseClusterInterface().restoreInitialStatus(); + } else { + getMiniHBaseCluster().shutdown(); + } + } + + /** + * Sets the configuration property to use a distributed cluster for the integration tests. Test drivers + * should use this to enforce cluster deployment. + */ + public static void setUseDistributedCluster(Configuration conf) { + conf.setBoolean(IS_DISTRIBUTED_CLUSTER, true); + System.setProperty(IS_DISTRIBUTED_CLUSTER, "true"); + } + + /** + * @return whether we are interacting with a distributed cluster as opposed to and in-process mini + * cluster or a local cluster. + * @see IntegrationTestingUtility#setUseDistributedCluster(Configuration) + */ + private boolean isDistributedCluster() { + Configuration conf = getConfiguration(); + boolean isDistributedCluster = false; + isDistributedCluster = Boolean.parseBoolean(System.getProperty(IS_DISTRIBUTED_CLUSTER, "false")); + if (!isDistributedCluster) { + isDistributedCluster = conf.getBoolean(IS_DISTRIBUTED_CLUSTER, false); + } + return isDistributedCluster; + } + + private void createDistributedHBaseCluster() throws IOException { + Configuration conf = getConfiguration(); + ClusterManager clusterManager = new HBaseClusterManager(); + setHBaseCluster(new DistributedHBaseCluster(conf, clusterManager)); + getHBaseAdmin(); + } + +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestsDriver.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestsDriver.java new file mode 100644 index 00000000000..fd6053f41b4 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestsDriver.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.internal.TextListener; +import org.junit.runner.JUnitCore; +import org.junit.runner.Result; + +/** + * This class drives the Integration test suite execution. Executes all + * tests having @Category(IntegrationTests.class) annotation against an + * already deployed distributed cluster. + */ +public class IntegrationTestsDriver extends AbstractHBaseTool { + + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new IntegrationTestsDriver(), args); + System.exit(ret); + } + + @Override + protected void addOptions() { + } + + @Override + protected void processOptions(CommandLine cmd) { + } + + /** + * Returns test classes annotated with @Category(IntegrationTests.class) + */ + private Class[] findIntegrationTestClasses() throws ClassNotFoundException, IOException { + TestCheckTestClasses util = new TestCheckTestClasses(); + List> classes = util.findTestClasses(IntegrationTests.class); + return classes.toArray(new Class[classes.size()]); + } + + @Override + protected int doWork() throws Exception { + + //this is called from the command line, so we should set to use the distributed cluster + IntegrationTestingUtility.setUseDistributedCluster(conf); + + JUnitCore junit = new JUnitCore(); + junit.addListener(new TextListener(System.out)); + Result result = junit.run(findIntegrationTestClasses()); + + return result.wasSuccessful() ? 0 : 1; + } + +} \ No newline at end of file diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java new file mode 100644 index 00000000000..2d67a74c90c --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseCluster; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTestDataIngestWithChaosMonkey; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * A utility to injects faults in a running cluster. + *

+ * ChaosMonkey defines Action's and Policy's. Actions are sequences of events, like + * - Select a random server to kill + * - Sleep for 5 sec + * - Start the server on the same host + * Actions can also be complex events, like rolling restart of all of the servers. + *

+ * Policies on the other hand are responsible for executing the actions based on a strategy. + * The default policy is to execute a random action every minute based on predefined action + * weights. ChaosMonkey executes predefined named policies until it is stopped. More than one + * policy can be active at any time. + *

+ * Chaos monkey can be run from the command line, or can be invoked from integration tests. + * See {@link IntegrationTestDataIngestWithChaosMonkey} or other integration tests that use + * chaos monkey for code examples. + *

+ * ChaosMonkey class is indeed inspired by the Netflix's same-named tool: + * http://techblog.netflix.com/2012/07/chaos-monkey-released-into-wild.html + */ +public class ChaosMonkey extends AbstractHBaseTool implements Stoppable { + + private static final Log LOG = LogFactory.getLog(ChaosMonkey.class); + + private static final long ONE_SEC = 1000; + private static final long FIVE_SEC = 5 * ONE_SEC; + private static final long ONE_MIN = 60 * ONE_SEC; + private static final long TIMEOUT = ONE_MIN; + + final IntegrationTestingUtility util; + + /** + * Construct a new ChaosMonkey + * @param util the HBaseIntegrationTestingUtility already configured + * @param policies names of pre-defined policies to use + */ + public ChaosMonkey(IntegrationTestingUtility util, String... policies) { + this.util = util; + setPoliciesByName(policies); + } + + private void setPoliciesByName(String... policies) { + this.policies = new Policy[policies.length]; + for (int i=0; i < policies.length; i++) { + this.policies[i] = NAMED_POLICIES.get(policies[i]); + } + } + + /** + * Context for Action's + */ + private static class ActionContext { + private IntegrationTestingUtility util; + + ActionContext(IntegrationTestingUtility util) { + this.util = util; + } + + IntegrationTestingUtility getHaseIntegrationTestingUtility() { + return util; + } + + HBaseCluster getHBaseCluster() { + return util.getHBaseClusterInterface(); + } + } + + /** + * A (possibly mischievous) action that the ChaosMonkey can perform. + */ + private static class Action { + long sleepTime; //how long should we sleep + ActionContext context; + HBaseCluster cluster; + ClusterStatus initialStatus; + ServerName[] initialServers; + + public Action(long sleepTime) { + this.sleepTime = sleepTime; + } + + void init(ActionContext context) throws Exception { + this.context = context; + cluster = context.getHBaseCluster(); + initialStatus = cluster.getInitialClusterStatus(); + Collection regionServers = initialStatus.getServers(); + initialServers = regionServers.toArray(new ServerName[regionServers.size()]); + } + + void perform() throws Exception { }; + + /** Returns current region servers */ + ServerName[] getCurrentServers() throws IOException { + Collection regionServers = cluster.getClusterStatus().getServers(); + return regionServers.toArray(new ServerName[regionServers.size()]); + } + + void killMaster(ServerName server) throws IOException { + LOG.info("Killing master:" + server); + cluster.killMaster(server); + cluster.waitForMasterToStop(server, TIMEOUT); + LOG.info("Killed master server:" + server); + } + + void startMaster(ServerName server) throws IOException { + LOG.info("Starting master:" + server.getHostname()); + cluster.startMaster(server.getHostname()); + cluster.waitForActiveAndReadyMaster(TIMEOUT); + LOG.info("Started master: " + server); + } + + void restartMaster(ServerName server, long sleepTime) throws IOException { + killMaster(server); + sleep(sleepTime); + startMaster(server); + } + + void killRs(ServerName server) throws IOException { + LOG.info("Killing region server:" + server); + cluster.killRegionServer(server); + cluster.waitForRegionServerToStop(server, TIMEOUT); + LOG.info("Killed region server:" + server + ". Reported num of rs:" + + cluster.getClusterStatus().getServersSize()); + } + + void startRs(ServerName server) throws IOException { + LOG.info("Starting region server:" + server.getHostname()); + cluster.startRegionServer(server.getHostname()); + cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT); + LOG.info("Started region server:" + server + ". Reported num of rs:" + + cluster.getClusterStatus().getServersSize()); + } + + void sleep(long sleepTime) { + LOG.info("Sleeping for:" + sleepTime); + Threads.sleep(sleepTime); + } + + void restartRs(ServerName server, long sleepTime) throws IOException { + killRs(server); + sleep(sleepTime); + startRs(server); + } + } + + private static class RestartActiveMaster extends Action { + public RestartActiveMaster(long sleepTime) { + super(sleepTime); + } + @Override + void perform() throws Exception { + LOG.info("Performing action: Restart active master"); + + ServerName master = cluster.getClusterStatus().getMaster(); + restartMaster(master, sleepTime); + } + } + + private static class RestartRandomRs extends Action { + public RestartRandomRs(long sleepTime) { + super(sleepTime); + } + + @Override + void init(ActionContext context) throws Exception { + super.init(context); + } + + @Override + void perform() throws Exception { + LOG.info("Performing action: Restart random region server"); + ServerName server = selectRandomItem(getCurrentServers()); + + restartRs(server, sleepTime); + } + } + + private static class RestartRsHoldingMeta extends RestartRandomRs { + public RestartRsHoldingMeta(long sleepTime) { + super(sleepTime); + } + @Override + void perform() throws Exception { + LOG.info("Performing action: Restart region server holding META"); + ServerName server = cluster.getServerHoldingMeta(); + if (server == null) { + LOG.warn("No server is holding .META. right now."); + return; + } + restartRs(server, sleepTime); + } + } + + private static class RestartRsHoldingRoot extends RestartRandomRs { + public RestartRsHoldingRoot(long sleepTime) { + super(sleepTime); + } + @Override + void perform() throws Exception { + LOG.info("Performing action: Restart region server holding ROOT"); + ServerName server = cluster.getServerHoldingMeta(); + if (server == null) { + LOG.warn("No server is holding -ROOT- right now."); + return; + } + restartRs(server, sleepTime); + } + } + + /** + * Restarts a ratio of the running regionservers at the same time + */ + private static class BatchRestartRs extends Action { + float ratio; //ratio of regionservers to restart + + public BatchRestartRs(long sleepTime, float ratio) { + super(sleepTime); + this.ratio = ratio; + } + + @Override + void init(ActionContext context) throws Exception { + super.init(context); + } + + @Override + void perform() throws Exception { + LOG.info(String.format("Performing action: Batch restarting %d%% of region servers", + (int)(ratio * 100))); + List selectedServers = selectRandomItems(getCurrentServers(), ratio); + + for (ServerName server : selectedServers) { + LOG.info("Killing region server:" + server); + cluster.killRegionServer(server); + } + + for (ServerName server : selectedServers) { + cluster.waitForRegionServerToStop(server, TIMEOUT); + } + + LOG.info("Killed " + selectedServers.size() + " region servers. Reported num of rs:" + + cluster.getClusterStatus().getServersSize()); + + sleep(sleepTime); + + for (ServerName server : selectedServers) { + LOG.info("Starting region server:" + server.getHostname()); + cluster.startRegionServer(server.getHostname()); + + } + for (ServerName server : selectedServers) { + cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT); + } + LOG.info("Started " + selectedServers.size() +" region servers. Reported num of rs:" + + cluster.getClusterStatus().getServersSize()); + } + } + + /** + * Restarts a ratio of the regionservers in a rolling fashion. At each step, either kills a + * server, or starts one, sleeping randomly (0-sleepTime) in between steps. + */ + private static class RollingBatchRestartRs extends BatchRestartRs { + public RollingBatchRestartRs(long sleepTime, float ratio) { + super(sleepTime, ratio); + } + + @Override + void perform() throws Exception { + LOG.info(String.format("Performing action: Rolling batch restarting %d%% of region servers", + (int)(ratio * 100))); + Random random = new Random(); + List selectedServers = selectRandomItems(getCurrentServers(), ratio); + + Queue serversToBeKilled = new LinkedList(selectedServers); + Queue deadServers = new LinkedList(); + + // + while (!serversToBeKilled.isEmpty() || !deadServers.isEmpty()) { + boolean action = true; //action true = kill server, false = start server + + if (serversToBeKilled.isEmpty() || deadServers.isEmpty()) { + action = deadServers.isEmpty(); + } else { + action = random.nextBoolean(); + } + + if (action) { + ServerName server = serversToBeKilled.remove(); + killRs(server); + deadServers.add(server); + } else { + ServerName server = deadServers.remove(); + startRs(server); + } + + sleep(random.nextInt((int)sleepTime)); + } + } + } + + /** + * A context for a Policy + */ + private static class PolicyContext extends ActionContext { + PolicyContext(IntegrationTestingUtility util) { + super(util); + } + } + + /** + * A policy to introduce chaos to the cluster + */ + private static abstract class Policy extends StoppableImplementation implements Runnable { + PolicyContext context; + public void init(PolicyContext context) throws Exception { + this.context = context; + } + } + + /** + * A policy, which picks a random action according to the given weights, + * and performs it every configurable period. + */ + private static class PeriodicRandomActionPolicy extends Policy { + private long period; + private List> actions; + + PeriodicRandomActionPolicy(long period, List> actions) { + this.period = period; + this.actions = actions; + } + + @Override + public void run() { + //add some jitter + int jitter = new Random().nextInt((int)period); + LOG.info("Sleeping for " + jitter + " to add jitter"); + Threads.sleep(jitter); + + while (!isStopped()) { + long start = System.currentTimeMillis(); + Action action = selectWeightedRandomItem(actions); + + try { + action.perform(); + } catch (Exception ex) { + LOG.warn("Exception occured during performing action: " + + StringUtils.stringifyException(ex)); + } + + long sleepTime = period - (System.currentTimeMillis() - start); + if (sleepTime > 0) { + LOG.info("Sleeping for:" + sleepTime); + Threads.sleep(sleepTime); + } + } + } + + @Override + public void init(PolicyContext context) throws Exception { + super.init(context); + LOG.info("Using ChaosMonkey Policy: " + this.getClass() + ", period:" + period); + for (Pair action : actions) { + action.getFirst().init(this.context); + } + } + } + + /** Selects a random item from the given items */ + static T selectRandomItem(T[] items) { + Random random = new Random(); + return items[random.nextInt(items.length)]; + } + + /** Selects a random item from the given items with weights*/ + static T selectWeightedRandomItem(List> items) { + Random random = new Random(); + int totalWeight = 0; + for (Pair pair : items) { + totalWeight += pair.getSecond(); + } + + int cutoff = random.nextInt(totalWeight); + int cummulative = 0; + T item = null; + + //warn: O(n) + for (int i=0; i List selectRandomItems(T[] items, float ratio) { + Random random = new Random(); + int remaining = (int)Math.ceil(items.length * ratio); + + List selectedItems = new ArrayList(remaining); + + for (int i=0; i 0; i++) { + if (random.nextFloat() < ((float)remaining/(items.length-i))) { + selectedItems.add(items[i]); + remaining--; + } + } + + return selectedItems; + } + + /** + * All actions that deal with RS's with the following weights (relative probabilities): + * - Restart active master (sleep 5 sec) : 2 + * - Restart random regionserver (sleep 5 sec) : 2 + * - Restart random regionserver (sleep 60 sec) : 2 + * - Restart META regionserver (sleep 5 sec) : 1 + * - Restart ROOT regionserver (sleep 5 sec) : 1 + * - Batch restart of 50% of regionservers (sleep 5 sec) : 2 + * - Rolling restart of 100% of regionservers (sleep 5 sec) : 2 + */ + @SuppressWarnings("unchecked") + private static final List> ALL_ACTIONS = Lists.newArrayList( + new Pair(new RestartActiveMaster(FIVE_SEC), 2), + new Pair(new RestartRandomRs(FIVE_SEC), 2), + new Pair(new RestartRandomRs(ONE_MIN), 2), + new Pair(new RestartRsHoldingMeta(FIVE_SEC), 1), + new Pair(new RestartRsHoldingRoot(FIVE_SEC), 1), + new Pair(new BatchRestartRs(FIVE_SEC, 0.5f), 2), + new Pair(new RollingBatchRestartRs(FIVE_SEC, 1.0f), 2) + ); + + public static final String EVERY_MINUTE_RANDOM_ACTION_POLICY = "EVERY_MINUTE_RANDOM_ACTION_POLICY"; + + private Policy[] policies; + private Thread[] monkeyThreads; + + public void start() throws Exception { + monkeyThreads = new Thread[policies.length]; + + for (int i=0; i NAMED_POLICIES = Maps.newHashMap(); + static { + NAMED_POLICIES.put(EVERY_MINUTE_RANDOM_ACTION_POLICY, + new PeriodicRandomActionPolicy(ONE_MIN, ALL_ACTIONS)); + } + + @Override + protected void addOptions() { + addOptWithArg("policy", "a named policy defined in ChaosMonkey.java. Possible values: " + + NAMED_POLICIES.keySet()); + //we can add more options, and make policies more configurable + } + + @Override + protected void processOptions(CommandLine cmd) { + String[] policies = cmd.getOptionValues("policy"); + if (policies != null) { + setPoliciesByName(policies); + } + } + + @Override + protected int doWork() throws Exception { + start(); + waitForStop(); + return 0; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + IntegrationTestingUtility util = new IntegrationTestingUtility(conf); + util.initializeCluster(1); + + ChaosMonkey monkey = new ChaosMonkey(util, EVERY_MINUTE_RANDOM_ACTION_POLICY); + int ret = ToolRunner.run(conf, monkey, args); + System.exit(ret); + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index ab530858d1f..e3633c94fff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -65,7 +65,7 @@ public abstract class AbstractHBaseTool implements Tool { protected abstract void processOptions(CommandLine cmd); /** The "main function" of the tool */ - protected abstract void doWork() throws Exception; + protected abstract int doWork() throws Exception; @Override public Configuration getConf() { @@ -101,13 +101,14 @@ public abstract class AbstractHBaseTool implements Tool { processOptions(cmd); + int ret = EXIT_FAILURE; try { - doWork(); + ret = doWork(); } catch (Exception e) { LOG.error("Error running command-line tool", e); return EXIT_FAILURE; } - return EXIT_SUCCESS; + return ret; } private boolean sanityCheckOptions(CommandLine cmd) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 8914fbef1fa..02a05a4e842 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -128,7 +128,7 @@ public class Threads { /** * @param millis How long to sleep for in milliseconds. */ - public static void sleep(int millis) { + public static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { @@ -160,15 +160,15 @@ public class Threads { } /** - * Create a new CachedThreadPool with a bounded number as the maximum + * Create a new CachedThreadPool with a bounded number as the maximum * thread size in the pool. - * + * * @param maxCachedThread the maximum thread could be created in the pool * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @param threadFactory the factory to use when creating new threads - * @return threadPoolExecutor the cachedThreadPool with a bounded number - * as the maximum thread size in the pool. + * @return threadPoolExecutor the cachedThreadPool with a bounded number + * as the maximum thread size in the pool. */ public static ThreadPoolExecutor getBoundedCachedThreadPool( int maxCachedThread, long timeout, TimeUnit unit, @@ -180,12 +180,12 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - + + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each * created thread uniquely, with a common prefix. - * + * * @param prefix The prefix of every created Thread's name * @return a {@link java.util.concurrent.ThreadFactory} that names threads */ @@ -193,7 +193,7 @@ public class Threads { return new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(1); - + @Override public Thread newThread(Runnable r) { return new Thread(r, prefix + threadNumber.getAndIncrement()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java new file mode 100644 index 00000000000..68212761941 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.ClientProtocol; +import org.apache.hadoop.hbase.util.Threads; + +/** + * This class defines methods that can help with managing HBase clusters + * from unit tests and system tests. There are 3 types of cluster deployments: + *

    + *
  • MiniHBaseCluster: each server is run in the same JVM in separate threads, + * used by unit tests
  • + *
  • DistributedHBaseCluster: the cluster is pre-deployed, system and integration tests can + * interact with the cluster.
  • + *
  • ProcessBasedLocalHBaseCluster: each server is deployed locally but in separate + * JVMs.
  • + *
+ *

+ * HBaseCluster unifies the way tests interact with the cluster, so that the same test can + * be run against a mini-cluster during unit test execution, or a distributed cluster having + * tens/hundreds of nodes during execution of integration tests. + * + *

+ * HBaseCluster exposes client-side public interfaces to tests, so that tests does not assume + * running in a particular mode. Not all the tests are suitable to be run on an actual cluster, + * and some tests will still need to mock stuff and introspect internal state. For those use + * cases from unit tests, or if more control is needed, you can use the subclasses directly. + * In that sense, this class does not abstract away every interface that + * MiniHBaseCluster or DistributedHBaseCluster provide. + */ +@InterfaceAudience.Private +public abstract class HBaseCluster implements Closeable, Configurable { + static final Log LOG = LogFactory.getLog(HBaseCluster.class.getName()); + protected Configuration conf; + + /** the status of the cluster before we begin */ + protected ClusterStatus initialClusterStatus; + + /** + * Construct an HBaseCluster + * @param conf Configuration to be used for cluster + */ + public HBaseCluster(Configuration conf) { + setConf(conf); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Returns a ClusterStatus for this HBase cluster. + * @see #getInitialClusterStatus() + */ + public abstract ClusterStatus getClusterStatus() throws IOException; + + /** + * Returns a ClusterStatus for this HBase cluster as observed at the + * starting of the HBaseCluster + */ + public ClusterStatus getInitialClusterStatus() throws IOException { + return initialClusterStatus; + } + + /** + * Returns an {@link MasterAdminProtocol} to the active master + */ + public abstract MasterAdminProtocol getMasterAdmin() + throws IOException; + + /** + * Returns an {@link MasterMonitorProtocol} to the active master + */ + public abstract MasterMonitorProtocol getMasterMonitor() + throws IOException; + + /** + * Returns an AdminProtocol interface to the regionserver + */ + public abstract AdminProtocol getAdminProtocol(ServerName serverName) throws IOException; + + /** + * Returns a ClientProtocol interface to the regionserver + */ + public abstract ClientProtocol getClientProtocol(ServerName serverName) throws IOException; + + /** + * Starts a new region server on the given hostname or if this is a mini/local cluster, + * starts a region server locally. + * @param hostname the hostname to start the regionserver on + * @throws IOException if something goes wrong + */ + public abstract void startRegionServer(String hostname) throws IOException; + + /** + * Kills the region server process if this is a distributed cluster, otherwise + * this causes the region server to exit doing basic clean up only. + * @throws IOException if something goes wrong + */ + public abstract void killRegionServer(ServerName serverName) throws IOException; + + /** + * Stops the given region server, by attempting a gradual stop. + * @return whether the operation finished with success + * @throws IOException if something goes wrong + */ + public abstract void stopRegionServer(ServerName serverName) throws IOException; + + /** + * Wait for the specified region server to join the cluster + * @return whether the operation finished with success + * @throws IOException if something goes wrong or timeout occurs + */ + public void waitForRegionServerToStart(String hostname, long timeout) + throws IOException { + long start = System.currentTimeMillis(); + while ((System.currentTimeMillis() - start) < timeout) { + for (ServerName server : getClusterStatus().getServers()) { + if (server.getHostname().equals(hostname)) { + return; + } + } + Threads.sleep(100); + } + throw new IOException("did timeout waiting for region server to start:" + hostname); + } + + /** + * Wait for the specified region server to stop the thread / process. + * @return whether the operation finished with success + * @throws IOException if something goes wrong or timeout occurs + */ + public abstract void waitForRegionServerToStop(ServerName serverName, long timeout) + throws IOException; + + /** + * Starts a new master on the given hostname or if this is a mini/local cluster, + * starts a master locally. + * @param hostname the hostname to start the master on + * @return whether the operation finished with success + * @throws IOException if something goes wrong + */ + public abstract void startMaster(String hostname) throws IOException; + + /** + * Kills the master process if this is a distributed cluster, otherwise, + * this causes master to exit doing basic clean up only. + * @throws IOException if something goes wrong + */ + public abstract void killMaster(ServerName serverName) throws IOException; + + /** + * Stops the given master, by attempting a gradual stop. + * @throws IOException if something goes wrong + */ + public abstract void stopMaster(ServerName serverName) throws IOException; + + /** + * Wait for the specified master to stop the thread / process. + * @throws IOException if something goes wrong or timeout occurs + */ + public abstract void waitForMasterToStop(ServerName serverName, long timeout) + throws IOException; + + /** + * Blocks until there is an active master and that master has completed + * initialization. + * + * @return true if an active master becomes available. false if there are no + * masters left. + * @throws IOException if something goes wrong or timeout occurs + */ + public boolean waitForActiveAndReadyMaster() + throws IOException { + return waitForActiveAndReadyMaster(Long.MAX_VALUE); + } + + /** + * Blocks until there is an active master and that master has completed + * initialization. + * @param timeout the timeout limit in ms + * @return true if an active master becomes available. false if there are no + * masters left. + */ + public abstract boolean waitForActiveAndReadyMaster(long timeout) + throws IOException; + + /** + * Wait for HBase Cluster to shut down. + */ + public abstract void waitUntilShutDown() throws IOException; + + /** + * Shut down the HBase cluster + */ + public abstract void shutdown() throws IOException; + + /** + * Restores the cluster to it's initial state if this is a real cluster, + * otherwise does nothing. + */ + public void restoreInitialStatus() throws IOException { + restoreClusterStatus(getInitialClusterStatus()); + } + + /** + * Restores the cluster to given state if this is a real cluster, + * otherwise does nothing. + */ + public void restoreClusterStatus(ClusterStatus desiredStatus) throws IOException { + } + + /** + * Get the ServerName of region server serving ROOT region + */ + public ServerName getServerHoldingRoot() throws IOException { + return getServerHoldingRegion(HRegionInfo.ROOT_REGIONINFO.getRegionName()); + } + + /** + * Get the ServerName of region server serving the first META region + */ + public ServerName getServerHoldingMeta() throws IOException { + return getServerHoldingRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + } + + /** + * Get the ServerName of region server serving the specified region + * @param regionName Name of the region in bytes + * @return ServerName that hosts the region or null + */ + public abstract ServerName getServerHoldingRegion(byte[] regionName) throws IOException; + + /** + * @return whether we are interacting with a distributed cluster as opposed to an + * in-process mini/local cluster. + */ + public boolean isDistributedCluster() { + return false; + } + + /** + * Closes all the resources held open for this cluster. Note that this call does not shutdown + * the cluster. + * @see #shutdown() + */ + @Override + public abstract void close() throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 3f1ffca2d57..a006a9bdc4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -102,7 +102,9 @@ import org.apache.zookeeper.ZooKeeper; * old HBaseTestCase and HBaseClusterTestCase functionality. * Create an instance and keep it around testing HBase. This class is * meant to be your one-stop shop for anything you might need testing. Manages - * one cluster at a time only. + * one cluster at a time only. Managed cluster can be an in-process + * {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}. + * Not all methods work with the real cluster. * Depends on log4j being on classpath and * hbase-site.xml for logging and test-run configuration. It does not set * logging levels nor make changes to configuration parameters. @@ -127,7 +129,7 @@ public class HBaseTestingUtility { private boolean passedZkCluster = false; private MiniDFSCluster dfsCluster = null; - private MiniHBaseCluster hbaseCluster = null; + private HBaseCluster hbaseCluster = null; private MiniMRCluster mrCluster = null; /** If there is a mini cluster running for this testing utility instance. */ @@ -230,6 +232,10 @@ public class HBaseTestingUtility { return this.conf; } + public void setHBaseCluster(HBaseCluster hbaseCluster) { + this.hbaseCluster = hbaseCluster; + } + /** * @return Where to write test data on local filesystem; usually * {@link #DEFAULT_BASE_TEST_DIRECTORY} @@ -697,7 +703,7 @@ public class HBaseTestingUtility { getHBaseAdmin(); // create immediately the hbaseAdmin LOG.info("Minicluster is up"); - return this.hbaseCluster; + return (MiniHBaseCluster)this.hbaseCluster; } /** @@ -725,7 +731,11 @@ public class HBaseTestingUtility { * @see #startMiniCluster() */ public MiniHBaseCluster getMiniHBaseCluster() { - return this.hbaseCluster; + if (this.hbaseCluster instanceof MiniHBaseCluster) { + return (MiniHBaseCluster)this.hbaseCluster; + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } /** @@ -764,7 +774,7 @@ public class HBaseTestingUtility { if (this.hbaseCluster != null) { this.hbaseCluster.shutdown(); // Wait till hbase is down before going on to shutdown zk. - this.hbaseCluster.join(); + this.hbaseCluster.waitUntilShutDown(); this.hbaseCluster = null; } } @@ -802,7 +812,7 @@ public class HBaseTestingUtility { * @throws IOException */ public void flush() throws IOException { - this.hbaseCluster.flushcache(); + getMiniHBaseCluster().flushcache(); } /** @@ -810,7 +820,7 @@ public class HBaseTestingUtility { * @throws IOException */ public void flush(byte [] tableName) throws IOException { - this.hbaseCluster.flushcache(tableName); + getMiniHBaseCluster().flushcache(tableName); } /** @@ -818,7 +828,7 @@ public class HBaseTestingUtility { * @throws IOException */ public void compact(boolean major) throws IOException { - this.hbaseCluster.compact(major); + getMiniHBaseCluster().compact(major); } /** @@ -826,7 +836,7 @@ public class HBaseTestingUtility { * @throws IOException */ public void compact(byte [] tableName, boolean major) throws IOException { - this.hbaseCluster.compact(tableName, major); + getMiniHBaseCluster().compact(tableName, major); } @@ -1046,8 +1056,8 @@ public class HBaseTestingUtility { t.flushCommits(); return rowCount; } - - + + /** * Load table of multiple column families with rows from 'aaa' to 'zzz'. * @param t Table @@ -1077,8 +1087,8 @@ public class HBaseTestingUtility { t.flushCommits(); return rowCount; } - - + + /** * Load region with rows from 'aaa' to 'zzz'. * @param r Region @@ -1242,9 +1252,10 @@ public class HBaseTestingUtility { HConnection conn = table.getConnection(); conn.clearRegionCache(); // assign all the new regions IF table is enabled. - if (getHBaseAdmin().isTableEnabled(table.getTableName())) { + HBaseAdmin admin = getHBaseAdmin(); + if (admin.isTableEnabled(table.getTableName())) { for(HRegionInfo hri : newRegions) { - hbaseCluster.getMaster().assignRegion(hri); + admin.assign(hri.getRegionName()); } } @@ -1351,8 +1362,8 @@ public class HBaseTestingUtility { Bytes.toString(tableName)); byte [] firstrow = metaRows.get(0); LOG.debug("FirstRow=" + Bytes.toString(firstrow)); - int index = hbaseCluster.getServerWith(firstrow); - return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); + int index = getMiniHBaseCluster().getServerWith(firstrow); + return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); } /** @@ -1468,7 +1479,7 @@ public class HBaseTestingUtility { * @throws Exception */ public void expireMasterSession() throws Exception { - HMaster master = hbaseCluster.getMaster(); + HMaster master = getMiniHBaseCluster().getMaster(); expireSession(master.getZooKeeper(), false); } @@ -1478,7 +1489,7 @@ public class HBaseTestingUtility { * @throws Exception */ public void expireRegionServerSession(int index) throws Exception { - HRegionServer rs = hbaseCluster.getRegionServer(index); + HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); expireSession(rs.getZooKeeper(), false); } @@ -1542,13 +1553,27 @@ public class HBaseTestingUtility { } } - /** - * Get the HBase cluster. + * Get the Mini HBase cluster. * * @return hbase cluster + * @see #getHBaseClusterInterface() */ public MiniHBaseCluster getHBaseCluster() { + return getMiniHBaseCluster(); + } + + /** + * Returns the HBaseCluster instance. + *

Returned object can be any of the subclasses of HBaseCluster, and the + * tests referring this should not assume that the cluster is a mini cluster or a + * distributed one. If the test only works on a mini cluster, then specific + * method {@link #getMiniHBaseCluster()} can be used instead w/o the + * need to type-cast. + */ + public HBaseCluster getHBaseClusterInterface() { + //implementation note: we should rename this method as #getHBaseCluster(), + //but this would require refactoring 90+ calls. return hbaseCluster; } @@ -1739,8 +1764,8 @@ public class HBaseTestingUtility { public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { boolean startedServer = false; - - for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i + *

  • Possibly takes hours to complete
  • + *
  • Can be run on a mini cluster or an actual cluster
  • + *
  • Can make changes to the given cluster (starting stopping daemons, etc)
  • + *
  • Should not be run in parallel of other integration tests
  • + * + * + * Integration / System tests should have a class name starting with "IntegrationTest", and + * should be annotated with @Category(IntegrationTests.class). Integration tests can be run + * using the IntegrationTestsDriver class or from mvn verify. + * + * @see SmallTests + * @see MediumTests + * @see LargeTests + */ +public interface IntegrationTests { +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/LargeTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/LargeTests.java index c4d3befa271..958ffd71c7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/LargeTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/LargeTests.java @@ -32,6 +32,7 @@ package org.apache.hadoop.hbase; * * @see SmallTests * @see MediumTests + * @see IntegrationTests */ public interface LargeTests { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MediumTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MediumTests.java index e99f2922dce..a51a2c9be6e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MediumTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MediumTests.java @@ -31,6 +31,7 @@ package org.apache.hadoop.hbase; * * @see SmallTests * @see LargeTests + * @see IntegrationTests */ public interface MediumTests { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index fa0afc0e572..8f7f3aa02a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -29,6 +29,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; @@ -36,11 +38,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.io.MapWritable; /** * This class creates a single process HBase cluster. @@ -50,9 +51,8 @@ import org.apache.hadoop.io.MapWritable; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class MiniHBaseCluster { +public class MiniHBaseCluster extends HBaseCluster { static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName()); - private Configuration conf; public LocalHBaseCluster hbaseCluster; private static int index; @@ -77,18 +77,17 @@ public class MiniHBaseCluster { public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) throws IOException, InterruptedException { - this.conf = conf; - conf.set(HConstants.MASTER_PORT, "0"); - init(numMasters, numRegionServers, null, null); + this(conf, numMasters, numRegionServers, null, null); } public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, Class masterClass, Class regionserverClass) throws IOException, InterruptedException { - this.conf = conf; + super(conf); conf.set(HConstants.MASTER_PORT, "0"); init(numMasters, numRegionServers, masterClass, regionserverClass); + this.initialClusterStatus = getClusterStatus(); } public Configuration getConfiguration() { @@ -229,6 +228,54 @@ public class MiniHBaseCluster { } } + @Override + public void startRegionServer(String hostname) throws IOException { + this.startRegionServer(); + } + + @Override + public void killRegionServer(ServerName serverName) throws IOException { + HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); + if (server instanceof MiniHBaseClusterRegionServer) { + LOG.info("Killing " + server.toString()); + ((MiniHBaseClusterRegionServer) server).kill(); + } else { + abortRegionServer(getRegionServerIndex(serverName)); + } + } + + @Override + public void stopRegionServer(ServerName serverName) throws IOException { + stopRegionServer(getRegionServerIndex(serverName)); + } + + @Override + public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { + //ignore timeout for now + waitOnRegionServer(getRegionServerIndex(serverName)); + } + + @Override + public void startMaster(String hostname) throws IOException { + this.startMaster(); + } + + @Override + public void killMaster(ServerName serverName) throws IOException { + abortMaster(getMasterIndex(serverName)); + } + + @Override + public void stopMaster(ServerName serverName) throws IOException { + stopMaster(getMasterIndex(serverName)); + } + + @Override + public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { + //ignore timeout for now + waitOnMaster(getMasterIndex(serverName)); + } + /** * Starts a region server thread running * @@ -324,6 +371,16 @@ public class MiniHBaseCluster { return t; } + @Override + public MasterAdminProtocol getMasterAdmin() { + return this.hbaseCluster.getActiveMaster(); + } + + @Override + public MasterMonitorProtocol getMasterMonitor() { + return this.hbaseCluster.getActiveMaster(); + } + /** * Returns the current active master, if available. * @return the active HMaster, null if none is active. @@ -398,15 +455,18 @@ public class MiniHBaseCluster { * masters left. * @throws InterruptedException */ - public boolean waitForActiveAndReadyMaster() throws InterruptedException { + public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { List mts; - while (!(mts = getMasterThreads()).isEmpty()) { + long start = System.currentTimeMillis(); + while (!(mts = getMasterThreads()).isEmpty() + && (System.currentTimeMillis() - start) < timeout) { for (JVMClusterUtil.MasterThread mt : mts) { if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { return true; } } - Thread.sleep(100); + + Threads.sleep(100); } return false; } @@ -443,6 +503,16 @@ public class MiniHBaseCluster { HConnectionManager.deleteAllConnections(false); } + @Override + public void close() throws IOException { + } + + @Override + public ClusterStatus getClusterStatus() throws IOException { + HMaster master = getMaster(); + return master == null ? null : master.getClusterStatus(); + } + /** * Call flushCache on all regions on all participating regionservers. * @throws IOException @@ -565,6 +635,15 @@ public class MiniHBaseCluster { return index; } + @Override + public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { + int index = getServerWith(regionName); + if (index < 0) { + return null; + } + return getRegionServer(index).getServerName(); + } + /** * Counts the total numbers of regions being served by the currently online * region servers by asking each how many regions they have. Does not look @@ -591,4 +670,40 @@ public class MiniHBaseCluster { masterThread.getMaster().abort("killAll", new Throwable()); } } + + @Override + public void waitUntilShutDown() { + this.hbaseCluster.join(); + } + + protected int getRegionServerIndex(ServerName serverName) { + //we have a small number of region servers, this should be fine for now. + List servers = getRegionServerThreads(); + for (int i=0; i < servers.size(); i++) { + if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { + return i; + } + } + return -1; + } + + protected int getMasterIndex(ServerName serverName) { + List masters = getMasterThreads(); + for (int i = 0; i < masters.size(); i++) { + if (masters.get(i).getMaster().getServerName().equals(serverName)) { + return i; + } + } + return -1; + } + + @Override + public AdminProtocol getAdminProtocol(ServerName serverName) throws IOException { + return getRegionServer(getRegionServerIndex(serverName)); + } + + @Override + public ClientProtocol getClientProtocol(ServerName serverName) throws IOException { + return getRegionServer(getRegionServerIndex(serverName)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/SmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/SmallTests.java index d68a118fb22..6953667d699 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/SmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/SmallTests.java @@ -28,6 +28,7 @@ package org.apache.hadoop.hbase; * * @see MediumTests * @see LargeTests + * @see IntegrationTests */ public interface SmallTests { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCheckTestClasses.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCheckTestClasses.java index d917bdb7429..1bc33c33fd0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCheckTestClasses.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCheckTestClasses.java @@ -18,11 +18,11 @@ package org.apache.hadoop.hbase; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runners.Suite; +import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -31,8 +31,9 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; -import static junit.framework.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runners.Suite; /** @@ -41,8 +42,21 @@ import static org.junit.Assert.assertTrue; @Category(SmallTests.class) public class TestCheckTestClasses { + private FileFilter TEST_CLASS_FILE_FILTER = new FileFilter() { + @Override + public boolean accept(File file) { + return file.isDirectory() || isTestClassFile(file); + + } + private boolean isTestClassFile(File file) { + String fileName = file.getName(); + return fileName.endsWith(".class") + && (fileName.startsWith("Test") || fileName.startsWith("IntegrationTest")); + } + }; + /** - * Throws an assertion if we find a test class without category (small/medium/large). + * Throws an assertion if we find a test class without category (small/medium/large/integration). * List all the test classes without category in the assertion message. */ @Test @@ -50,7 +64,7 @@ public class TestCheckTestClasses { List> badClasses = new java.util.ArrayList>(); for (Class c : findTestClasses()) { - if (!existCategoryAnnotation(c)) { + if (!existCategoryAnnotation(c, null)) { badClasses.add(c); } } @@ -59,9 +73,22 @@ public class TestCheckTestClasses { + badClasses, badClasses.isEmpty()); } + /** Returns whether the class has @Category annotation having the xface value. + */ + private boolean existCategoryAnnotation(Class c, Class xface) { + Category category = c.getAnnotation(Category.class); - private boolean existCategoryAnnotation(Class c) { - return (c.getAnnotation(Category.class) != null); + if (category != null) { + if (xface == null) { + return true; + } + for (Class cc : category.value()) { + if (cc.equals(xface)) { + return true; + } + } + } + return false; } /* @@ -88,6 +115,19 @@ public class TestCheckTestClasses { return false; } + /** + * Finds test classes which are annotated with @Category having xface value + * @param xface the @Category value + */ + public List> findTestClasses(Class xface) throws ClassNotFoundException, IOException { + List> classes = new ArrayList>(); + for (Class c : findTestClasses()) { + if (existCategoryAnnotation(c, xface)) { + classes.add(c); + } + } + return classes; + } private List> findTestClasses() throws ClassNotFoundException, IOException { final String packageName = "org.apache.hadoop.hbase"; @@ -117,14 +157,14 @@ public class TestCheckTestClasses { return classes; } - File[] files = baseDirectory.listFiles(); + File[] files = baseDirectory.listFiles(TEST_CLASS_FILE_FILTER); assertNotNull(files); for (File file : files) { final String fileName = file.getName(); if (file.isDirectory()) { classes.addAll(findTestClasses(file, packageName + "." + fileName)); - } else if (fileName.endsWith(".class") && fileName.startsWith("Test")) { + } else { Class c = Class.forName( packageName + '.' + fileName.substring(0, fileName.length() - 6), false, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index 63d96a32d8d..c780eb76539 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -34,7 +34,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.RegionTransition; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -42,9 +54,9 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -219,7 +231,7 @@ public class TestMasterFailover { enabledAndAssignedRegions.add(enabledRegions.remove(0)); enabledAndAssignedRegions.add(enabledRegions.remove(0)); enabledAndAssignedRegions.add(closingRegion); - + List disabledAndAssignedRegions = new ArrayList(); disabledAndAssignedRegions.add(disabledRegions.remove(0)); disabledAndAssignedRegions.add(disabledRegions.remove(0)); @@ -457,18 +469,18 @@ public class TestMasterFailover { // Create a ZKW to use in the test ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "unittest", new Abortable() { - + @Override public void abort(String why, Throwable e) { LOG.error("Fatal ZK Error: " + why, e); org.junit.Assert.assertFalse("Fatal ZK error", true); } - + @Override public boolean isAborted() { return false; } - + }); // get all the master threads @@ -895,8 +907,8 @@ public class TestMasterFailover { TEST_UTIL.shutdownMiniHBaseCluster(); // Create a ZKW to use in the test - ZooKeeperWatcher zkw = - HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL, + ZooKeeperWatcher zkw = + HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL, metaRegion, regionServer.getServerName()); LOG.info("Staring cluster for second time"); @@ -1042,10 +1054,10 @@ public class TestMasterFailover { * @param cluster * @return the new active master * @throws InterruptedException - * @throws MasterNotRunningException + * @throws IOException */ private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster) - throws InterruptedException, MasterNotRunningException { + throws InterruptedException, IOException { int activeIndex = getActiveMasterIndex(cluster); HMaster active = cluster.getMaster(); cluster.stopMaster(activeIndex); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 19ea995b152..8b45bb0b347 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.StoppableImplementation; import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -179,7 +180,7 @@ public class TestEndToEndSplitTransaction { //for daughters. HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY); - Stoppable stopper = new SimpleStoppable(); + Stoppable stopper = new StoppableImplementation(); RegionSplitter regionSplitter = new RegionSplitter(table); RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME); @@ -202,20 +203,6 @@ public class TestEndToEndSplitTransaction { regionChecker.verify(); } - private static class SimpleStoppable implements Stoppable { - volatile boolean stopped = false; - - @Override - public void stop(String why) { - this.stopped = true; - } - - @Override - public boolean isStopped() { - return stopped; - } - } - static class RegionSplitter extends Thread { Throwable ex; HTable table; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 9fa7acd4bf9..4b81b3bcf77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -289,7 +289,7 @@ public class LoadTestTool extends AbstractHBaseTool { } @Override - protected void doWork() throws IOException { + protected int doWork() throws IOException { if (cmd.hasOption(OPT_ZK_QUORUM)) { conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); } @@ -335,6 +335,16 @@ public class LoadTestTool extends AbstractHBaseTool { if (isRead) { readerThreads.waitForFinish(); } + + boolean success = true; + if (isWrite) { + success = success && writerThreads.getNumWriteFailures() == 0; + } + if (isRead) { + success = success && readerThreads.getNumReadErrors() == 0 + && readerThreads.getNumReadFailures() == 0; + } + return success ? 0 : 1; } public static void main(String[] args) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java index 3128dc67ac3..a2f3d287491 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -86,7 +86,7 @@ public class RestartMetaTest extends AbstractHBaseTool { } @Override - protected void doWork() throws Exception { + protected int doWork() throws Exception { ProcessBasedLocalHBaseCluster hbaseCluster = new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers); hbaseCluster.startMiniDFS(); @@ -128,6 +128,7 @@ public class RestartMetaTest extends AbstractHBaseTool { + Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY) .get(HConstants.SERVER_QUALIFIER))); } + return 0; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/StoppableImplementation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/StoppableImplementation.java new file mode 100644 index 00000000000..51a22f3d240 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/StoppableImplementation.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; + +/** + * A base implementation for a Stoppable service + */ +@InterfaceAudience.Private +public class StoppableImplementation implements Stoppable { + volatile boolean stopped = false; + + @Override + public void stop(String why) { + this.stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } +} \ No newline at end of file