HBASE-6241 HBaseCluster interface for interacting with the cluster from system tests
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1385024 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
802f264f4e
commit
f125ca952c
|
@ -349,7 +349,7 @@ fi
|
||||||
|
|
||||||
# Exec unless HBASE_NOEXEC is set.
|
# Exec unless HBASE_NOEXEC is set.
|
||||||
if [ "${HBASE_NOEXEC}" != "" ]; then
|
if [ "${HBASE_NOEXEC}" != "" ]; then
|
||||||
"$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@"
|
"$JAVA" -Dproc_$COMMAND {-XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@"
|
||||||
else
|
else
|
||||||
exec "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@"
|
exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@"
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -38,6 +38,12 @@
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
<testResources>
|
||||||
|
<testResource>
|
||||||
|
<!-- We depend on hbase-server test resources -->
|
||||||
|
<directory>../hbase-server/src/test/resources</directory>
|
||||||
|
</testResource>
|
||||||
|
</testResources>
|
||||||
<pluginManagement>
|
<pluginManagement>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
|
@ -66,16 +72,20 @@
|
||||||
<DYLD_LIBRARY_PATH>${env.DYLD_LIBRARY_PATH}:${project.build.directory}/nativelib</DYLD_LIBRARY_PATH>
|
<DYLD_LIBRARY_PATH>${env.DYLD_LIBRARY_PATH}:${project.build.directory}/nativelib</DYLD_LIBRARY_PATH>
|
||||||
<MALLOC_ARENA_MAX>4</MALLOC_ARENA_MAX>
|
<MALLOC_ARENA_MAX>4</MALLOC_ARENA_MAX>
|
||||||
</environmentVariables>
|
</environmentVariables>
|
||||||
|
<failIfNoTests>false</failIfNoTests>
|
||||||
|
<testFailureIgnore>false</testFailureIgnore>
|
||||||
</configuration>
|
</configuration>
|
||||||
<executions>
|
<executions>
|
||||||
<execution>
|
<execution>
|
||||||
<id>integration-test</id>
|
<id>integration-test</id>
|
||||||
|
<phase>integration-test</phase>
|
||||||
<goals>
|
<goals>
|
||||||
<goal>integration-test</goal>
|
<goal>integration-test</goal>
|
||||||
</goals>
|
</goals>
|
||||||
</execution>
|
</execution>
|
||||||
<execution>
|
<execution>
|
||||||
<id>verify</id>
|
<id>verify</id>
|
||||||
|
<phase>verify</phase>
|
||||||
<goals>
|
<goals>
|
||||||
<goal>verify</goal>
|
<goal>verify</goal>
|
||||||
</goals>
|
</goals>
|
||||||
|
@ -93,6 +103,12 @@
|
||||||
<configuration>
|
<configuration>
|
||||||
<skip>false</skip>
|
<skip>false</skip>
|
||||||
<forkMode>always</forkMode>
|
<forkMode>always</forkMode>
|
||||||
|
<!-- TODO: failsafe does timeout, but verify does not fail the build because of the timeout.
|
||||||
|
I believe it is a failsafe bug, we may consider using surefire -->
|
||||||
|
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
|
||||||
|
<argLine>-enableassertions -Xmx1900m
|
||||||
|
-Djava.security.egd=file:/dev/./urandom</argLine>
|
||||||
|
<testFailureIgnore>false</testFailureIgnore>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClusterManager is an api to manage servers in a distributed environment. It provides services
|
||||||
|
* for starting / stopping / killing Hadoop/HBase daemons. Concrete implementations provide actual
|
||||||
|
* functionality for carrying out deployment-specific tasks.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class ClusterManager extends Configured {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(ClusterManager.class);
|
||||||
|
|
||||||
|
private static final String SIGKILL = "SIGKILL";
|
||||||
|
private static final String SIGSTOP = "SIGSTOP";
|
||||||
|
private static final String SIGCONT = "SIGCONT";
|
||||||
|
|
||||||
|
public ClusterManager() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Type of the service daemon
|
||||||
|
*/
|
||||||
|
public static enum ServiceType {
|
||||||
|
HADOOP_NAMENODE("namenode"),
|
||||||
|
HADOOP_DATANODE("datanode"),
|
||||||
|
HADOOP_JOBTRACKER("jobtracker"),
|
||||||
|
HADOOP_TASKTRACKER("tasktracker"),
|
||||||
|
HBASE_MASTER("master"),
|
||||||
|
HBASE_REGIONSERVER("regionserver");
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
ServiceType(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the service on the given host
|
||||||
|
*/
|
||||||
|
public abstract void start(ServiceType service, String hostname) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the service on the given host
|
||||||
|
*/
|
||||||
|
public abstract void stop(ServiceType service, String hostname) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart the service on the given host
|
||||||
|
*/
|
||||||
|
public abstract void restart(ServiceType service, String hostname) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send the given posix signal to the service
|
||||||
|
*/
|
||||||
|
public abstract void signal(ServiceType service, String signal,
|
||||||
|
String hostname) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kill the service running on given host
|
||||||
|
*/
|
||||||
|
public void kill(ServiceType service, String hostname) throws IOException {
|
||||||
|
signal(service, SIGKILL, hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Suspend the service running on given host
|
||||||
|
*/
|
||||||
|
public void suspend(ServiceType service, String hostname) throws IOException {
|
||||||
|
signal(service, SIGSTOP, hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resume the services running on given hosts
|
||||||
|
*/
|
||||||
|
public void resume(ServiceType service, String hostname) throws IOException {
|
||||||
|
signal(service, SIGCONT, hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the service is running on the remote host. This only checks whether the
|
||||||
|
* service still has a pid.
|
||||||
|
*/
|
||||||
|
public abstract boolean isRunning(ServiceType service, String hostname) throws IOException;
|
||||||
|
|
||||||
|
/* TODO: further API ideas:
|
||||||
|
*
|
||||||
|
* //return services running on host:
|
||||||
|
* ServiceType[] getRunningServicesOnHost(String hostname);
|
||||||
|
*
|
||||||
|
* //return which services can be run on host (for example, to query whether hmaster can run on this host)
|
||||||
|
* ServiceType[] getRunnableServicesOnHost(String hostname);
|
||||||
|
*
|
||||||
|
* //return which hosts can run this service
|
||||||
|
* String[] getRunnableHostsForService(ServiceType service);
|
||||||
|
*/
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,287 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.ClusterManager.ServiceType;
|
||||||
|
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||||
|
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages the interactions with an already deployed distributed cluster (as opposed to
|
||||||
|
* a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class DistributedHBaseCluster extends HBaseCluster {
|
||||||
|
|
||||||
|
private HBaseAdmin admin;
|
||||||
|
|
||||||
|
private ClusterManager clusterManager;
|
||||||
|
|
||||||
|
public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
|
||||||
|
throws IOException {
|
||||||
|
super(conf);
|
||||||
|
this.clusterManager = clusterManager;
|
||||||
|
this.admin = new HBaseAdmin(conf);
|
||||||
|
this.initialClusterStatus = getClusterStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClusterManager(ClusterManager clusterManager) {
|
||||||
|
this.clusterManager = clusterManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClusterManager getClusterManager() {
|
||||||
|
return clusterManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ClusterStatus for this HBase cluster
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ClusterStatus getClusterStatus() throws IOException {
|
||||||
|
return admin.getClusterStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClusterStatus getInitialClusterStatus() throws IOException {
|
||||||
|
return initialClusterStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (this.admin != null) {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AdminProtocol getAdminProtocol(ServerName serverName) throws IOException {
|
||||||
|
return admin.getConnection().getAdmin(serverName.getHostname(), serverName.getPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientProtocol getClientProtocol(ServerName serverName) throws IOException {
|
||||||
|
return admin.getConnection().getClient(serverName.getHostname(), serverName.getPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startRegionServer(String hostname) throws IOException {
|
||||||
|
LOG.info("Starting RS on: " + hostname);
|
||||||
|
clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killRegionServer(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Aborting RS: " + serverName.getServerName());
|
||||||
|
clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopRegionServer(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Stopping RS: " + serverName.getServerName());
|
||||||
|
clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
|
||||||
|
waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info("Waiting service:" + service + " to stop: " + serverName.getServerName());
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
|
while ((System.currentTimeMillis() - start) < timeout) {
|
||||||
|
if (!clusterManager.isRunning(service, serverName.getHostname())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Threads.sleep(1000);
|
||||||
|
}
|
||||||
|
throw new IOException("did timeout waiting for service to stop:" + serverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MasterAdminProtocol getMasterAdmin() throws IOException {
|
||||||
|
HConnection conn = HConnectionManager.getConnection(conf);
|
||||||
|
return conn.getMasterAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MasterMonitorProtocol getMasterMonitor() throws IOException {
|
||||||
|
HConnection conn = HConnectionManager.getConnection(conf);
|
||||||
|
return conn.getMasterMonitor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startMaster(String hostname) throws IOException {
|
||||||
|
LOG.info("Starting Master on: " + hostname);
|
||||||
|
clusterManager.start(ServiceType.HBASE_MASTER, hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMaster(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Aborting Master: " + serverName.getServerName());
|
||||||
|
clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopMaster(ServerName serverName) throws IOException {
|
||||||
|
LOG.info("Stopping Master: " + serverName.getServerName());
|
||||||
|
clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
|
||||||
|
waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
while (System.currentTimeMillis() - start < timeout) {
|
||||||
|
try {
|
||||||
|
getMasterAdmin();
|
||||||
|
return true;
|
||||||
|
} catch (MasterNotRunningException m) {
|
||||||
|
LOG.warn("Master not started yet " + m);
|
||||||
|
} catch (ZooKeeperConnectionException e) {
|
||||||
|
LOG.warn("Failed to connect to ZK " + e);
|
||||||
|
}
|
||||||
|
Threads.sleep(1000);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
|
||||||
|
HConnection connection = admin.getConnection();
|
||||||
|
HRegionLocation regionLoc = connection.locateRegion(regionName);
|
||||||
|
if (regionLoc == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
AdminProtocol client = connection.getAdmin(regionLoc.getHostname(), regionLoc.getPort());
|
||||||
|
ServerInfo info = ProtobufUtil.getServerInfo(client);
|
||||||
|
return ProtobufUtil.toServerName(info.getServerName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitUntilShutDown() {
|
||||||
|
//Simply wait for a few seconds for now (after issuing serverManager.kill
|
||||||
|
throw new RuntimeException("Not implemented yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() throws IOException {
|
||||||
|
//not sure we want this
|
||||||
|
throw new RuntimeException("Not implemented yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDistributedCluster() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restoreClusterStatus(ClusterStatus initial) throws IOException {
|
||||||
|
//TODO: caution: not tested throughly
|
||||||
|
ClusterStatus current = getClusterStatus();
|
||||||
|
|
||||||
|
//restore masters
|
||||||
|
|
||||||
|
//check whether current master has changed
|
||||||
|
if (!ServerName.isSameHostnameAndPort(initial.getMaster(), current.getMaster())) {
|
||||||
|
//master has changed, we would like to undo this.
|
||||||
|
//1. Kill the current backups
|
||||||
|
//2. Stop current master
|
||||||
|
//3. Start a master at the initial hostname (if not already running as backup)
|
||||||
|
//4. Start backup masters
|
||||||
|
boolean foundOldMaster = false;
|
||||||
|
for (ServerName currentBackup : current.getBackupMasters()) {
|
||||||
|
if (!ServerName.isSameHostnameAndPort(currentBackup, initial.getMaster())) {
|
||||||
|
stopMaster(currentBackup);
|
||||||
|
} else {
|
||||||
|
foundOldMaster = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stopMaster(current.getMaster());
|
||||||
|
if (foundOldMaster) { //if initial master is not running as a backup
|
||||||
|
startMaster(initial.getMaster().getHostname());
|
||||||
|
}
|
||||||
|
waitForActiveAndReadyMaster(); //wait so that active master takes over
|
||||||
|
|
||||||
|
//start backup masters
|
||||||
|
for (ServerName backup : initial.getBackupMasters()) {
|
||||||
|
//these are not started in backup mode, but we should already have an active master
|
||||||
|
startMaster(backup.getHostname());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//current master has not changed, match up backup masters
|
||||||
|
HashMap<String, ServerName> initialBackups = new HashMap<String, ServerName>();
|
||||||
|
HashMap<String, ServerName> currentBackups = new HashMap<String, ServerName>();
|
||||||
|
|
||||||
|
for (ServerName server : initial.getBackupMasters()) {
|
||||||
|
initialBackups.put(server.getHostname(), server);
|
||||||
|
}
|
||||||
|
for (ServerName server : current.getBackupMasters()) {
|
||||||
|
currentBackups.put(server.getHostname(), server);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String hostname : Sets.difference(initialBackups.keySet(), currentBackups.keySet())) {
|
||||||
|
startMaster(hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String hostname : Sets.difference(currentBackups.keySet(), initialBackups.keySet())) {
|
||||||
|
stopMaster(currentBackups.get(hostname));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//restore region servers
|
||||||
|
HashMap<String, ServerName> initialServers = new HashMap<String, ServerName>();
|
||||||
|
HashMap<String, ServerName> currentServers = new HashMap<String, ServerName>();
|
||||||
|
|
||||||
|
for (ServerName server : initial.getServers()) {
|
||||||
|
initialServers.put(server.getHostname(), server);
|
||||||
|
}
|
||||||
|
for (ServerName server : current.getServers()) {
|
||||||
|
currentServers.put(server.getHostname(), server);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String hostname : Sets.difference(initialServers.keySet(), currentServers.keySet())) {
|
||||||
|
startRegionServer(hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String hostname : Sets.difference(currentServers.keySet(), initialServers.keySet())) {
|
||||||
|
stopRegionServer(currentServers.get(hostname));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,213 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A default cluster manager for HBase. Uses SSH, and hbase shell scripts
|
||||||
|
* to manage the cluster. Assumes Unix-like commands are available like 'ps',
|
||||||
|
* 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
|
||||||
|
* servers on the remote machines (for example, the test user could be the same user as the
|
||||||
|
* user the daemon isrunning as)
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HBaseClusterManager extends ClusterManager {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes commands over SSH
|
||||||
|
*/
|
||||||
|
static class RemoteShell extends Shell.ShellCommandExecutor {
|
||||||
|
|
||||||
|
private String hostname;
|
||||||
|
|
||||||
|
private String sshCmd = "/usr/bin/ssh";
|
||||||
|
private String sshOptions = System.getenv("HBASE_SSH_OPTS"); //from conf/hbase-env.sh
|
||||||
|
|
||||||
|
public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
|
||||||
|
long timeout) {
|
||||||
|
super(execString, dir, env, timeout);
|
||||||
|
this.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
|
||||||
|
super(execString, dir, env);
|
||||||
|
this.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteShell(String hostname, String[] execString, File dir) {
|
||||||
|
super(execString, dir);
|
||||||
|
this.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteShell(String hostname, String[] execString) {
|
||||||
|
super(execString);
|
||||||
|
this.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String[] getExecString() {
|
||||||
|
return new String[] {
|
||||||
|
"bash", "-c",
|
||||||
|
StringUtils.join(new String[] { sshCmd,
|
||||||
|
sshOptions == null ? "" : sshOptions,
|
||||||
|
hostname,
|
||||||
|
"\"" + StringUtils.join(super.getExecString(), " ") + "\""
|
||||||
|
}, " ")};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute() throws IOException {
|
||||||
|
super.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSshCmd(String sshCmd) {
|
||||||
|
this.sshCmd = sshCmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSshOptions(String sshOptions) {
|
||||||
|
this.sshOptions = sshOptions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSshCmd() {
|
||||||
|
return sshCmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSshOptions() {
|
||||||
|
return sshOptions;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides command strings for services to be executed by Shell. CommandProviders are
|
||||||
|
* pluggable, and different deployments(windows, bigtop, etc) can be managed by
|
||||||
|
* plugging-in custom CommandProvider's or ClusterManager's.
|
||||||
|
*/
|
||||||
|
static abstract class CommandProvider {
|
||||||
|
|
||||||
|
enum Operation {
|
||||||
|
START, STOP, RESTART
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract String getCommand(ServiceType service, Operation op);
|
||||||
|
|
||||||
|
public String isRunningCommand(ServiceType service) {
|
||||||
|
return findPidCommand(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String findPidCommand(ServiceType service) {
|
||||||
|
return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
|
||||||
|
service);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String signalCommand(ServiceType service, String signal) {
|
||||||
|
return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CommandProvider to manage the service using bin/hbase-* scripts
|
||||||
|
*/
|
||||||
|
static class HBaseShellCommandProvider extends CommandProvider {
|
||||||
|
private String getHBaseHome() {
|
||||||
|
return System.getenv("HBASE_HOME");
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getConfig() {
|
||||||
|
String confDir = System.getenv("HBASE_CONF_DIR");
|
||||||
|
if (confDir != null) {
|
||||||
|
return String.format("--config %s", confDir);
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCommand(ServiceType service, Operation op) {
|
||||||
|
return String.format("%s/bin/hbase-daemon.sh %s %s %s", getHBaseHome(), getConfig(),
|
||||||
|
op.toString().toLowerCase(), service);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public HBaseClusterManager() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CommandProvider getCommandProvider(ServiceType service) {
|
||||||
|
//TODO: make it pluggable, or auto-detect the best command provider, should work with
|
||||||
|
//hadoop daemons as well
|
||||||
|
return new HBaseShellCommandProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the given command on the host using SSH
|
||||||
|
* @return pair of exit code and command output
|
||||||
|
* @throws IOException if something goes wrong.
|
||||||
|
*/
|
||||||
|
private Pair<Integer, String> exec(String hostname, String... cmd) throws IOException {
|
||||||
|
LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
|
||||||
|
|
||||||
|
RemoteShell shell = new RemoteShell(hostname, cmd);
|
||||||
|
shell.execute();
|
||||||
|
|
||||||
|
LOG.info("Executed remote command, exit code:" + shell.getExitCode()
|
||||||
|
+ " , output:" + shell.getOutput());
|
||||||
|
|
||||||
|
return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exec(String hostname, ServiceType service, Operation op) throws IOException {
|
||||||
|
exec(hostname, getCommandProvider(service).getCommand(service, op));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(ServiceType service, String hostname) throws IOException {
|
||||||
|
exec(hostname, service, Operation.START);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(ServiceType service, String hostname) throws IOException {
|
||||||
|
exec(hostname, service, Operation.STOP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restart(ServiceType service, String hostname) throws IOException {
|
||||||
|
exec(hostname, service, Operation.RESTART);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void signal(ServiceType service, String signal, String hostname) throws IOException {
|
||||||
|
exec(hostname, getCommandProvider(service).signalCommand(service, signal));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning(ServiceType service, String hostname) throws IOException {
|
||||||
|
String ret = exec(hostname, getCommandProvider(service).isRunningCommand(service))
|
||||||
|
.getSecond();
|
||||||
|
return ret.length() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,133 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ChaosMonkey;
|
||||||
|
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A system test which does large data ingestion and verify using {@link LoadTestTool},
|
||||||
|
* while killing the region servers and the master(s) randomly. You can configure how long
|
||||||
|
* should the load test run by using "hbase.IntegrationTestDataIngestWithChaosMonkey.runtime"
|
||||||
|
* configuration parameter.
|
||||||
|
*/
|
||||||
|
@Category(IntegrationTests.class)
|
||||||
|
public class IntegrationTestDataIngestWithChaosMonkey {
|
||||||
|
|
||||||
|
private static final String TABLE_NAME = "TestDataIngestWithChaosMonkey";
|
||||||
|
private static final int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
|
||||||
|
|
||||||
|
/** A soft limit on how long we should run */
|
||||||
|
private static final String RUN_TIME_KEY = "hbase.IntegrationTestDataIngestWithChaosMonkey.runtime";
|
||||||
|
|
||||||
|
//run for 5 min by default
|
||||||
|
private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000;
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(IntegrationTestDataIngestWithChaosMonkey.class);
|
||||||
|
private IntegrationTestingUtility util;
|
||||||
|
private HBaseCluster cluster;
|
||||||
|
private ChaosMonkey monkey;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
util = new IntegrationTestingUtility();
|
||||||
|
|
||||||
|
util.initializeCluster(NUM_SLAVES_BASE);
|
||||||
|
|
||||||
|
cluster = util.getHBaseClusterInterface();
|
||||||
|
deleteTableIfNecessary();
|
||||||
|
|
||||||
|
monkey = new ChaosMonkey(util, ChaosMonkey.EVERY_MINUTE_RANDOM_ACTION_POLICY);
|
||||||
|
monkey.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
monkey.stop("test has finished, that's why");
|
||||||
|
monkey.waitForStop();
|
||||||
|
util.restoreCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteTableIfNecessary() throws IOException {
|
||||||
|
if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
|
||||||
|
util.deleteTable(Bytes.toBytes(TABLE_NAME));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataIngest() throws Exception {
|
||||||
|
LOG.info("Running testDataIngest");
|
||||||
|
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
|
||||||
|
|
||||||
|
LoadTestTool loadTool = new LoadTestTool();
|
||||||
|
loadTool.setConf(util.getConfiguration());
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
long runtime = util.getConfiguration().getLong(RUN_TIME_KEY, DEFAULT_RUN_TIME);
|
||||||
|
long startKey = 0;
|
||||||
|
|
||||||
|
long numKeys = estimateDataSize();
|
||||||
|
while (System.currentTimeMillis() - start < 0.9 * runtime) {
|
||||||
|
LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
|
||||||
|
((runtime - (System.currentTimeMillis() - start))/60000) + " min");
|
||||||
|
|
||||||
|
int ret = loadTool.run(new String[] {
|
||||||
|
"-tn", TABLE_NAME,
|
||||||
|
"-write", "10:100:20",
|
||||||
|
"-start_key", String.valueOf(startKey),
|
||||||
|
"-num_keys", String.valueOf(numKeys)
|
||||||
|
});
|
||||||
|
|
||||||
|
//assert that load was successful
|
||||||
|
Assert.assertEquals(0, ret);
|
||||||
|
|
||||||
|
ret = loadTool.run(new String[] {
|
||||||
|
"-tn", TABLE_NAME,
|
||||||
|
"-read", "100:20",
|
||||||
|
"-start_key", String.valueOf(startKey),
|
||||||
|
"-num_keys", String.valueOf(numKeys)
|
||||||
|
});
|
||||||
|
|
||||||
|
//assert that verify was successful
|
||||||
|
Assert.assertEquals(0, ret);
|
||||||
|
startKey += numKeys;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Estimates a data size based on the cluster size */
|
||||||
|
protected long estimateDataSize() throws IOException {
|
||||||
|
//base is a 4 slave node cluster.
|
||||||
|
ClusterStatus status = cluster.getClusterStatus();
|
||||||
|
int numRegionServers = status.getServersSize();
|
||||||
|
int multiplication = Math.max(1, numRegionServers / NUM_SLAVES_BASE);
|
||||||
|
|
||||||
|
return 10000 * multiplication;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Facility for <strong>integration/system</strong> tests. This extends {@link HBaseTestingUtility}
|
||||||
|
* and adds-in the functionality needed by integration and system tests. This class understands
|
||||||
|
* distributed and pseudo-distributed/local cluster deployments, and abstracts those from the tests
|
||||||
|
* in this module.
|
||||||
|
* <p>
|
||||||
|
* IntegrationTestingUtility is constructed and used by the integration tests, but the tests
|
||||||
|
* themselves should not assume a particular deployment. They can rely on the methods in this
|
||||||
|
* class and HBaseCluster. Before the testing begins, the test should initialize the cluster by
|
||||||
|
* calling {@link #initializeCluster(int)}.
|
||||||
|
* <p>
|
||||||
|
* The cluster that is used defaults to a mini cluster, but it can be forced to use a distributed
|
||||||
|
* cluster by calling {@link #setUseDistributedCluster(Configuration)}. This method is invoked by
|
||||||
|
* test drivers (maven, IntegrationTestsDriver, etc) before initializing the cluster
|
||||||
|
* via {@link #initializeCluster(int)}. Individual tests should not directly call
|
||||||
|
* {@link #setUseDistributedCluster(Configuration)}.
|
||||||
|
*/
|
||||||
|
public class IntegrationTestingUtility extends HBaseTestingUtility {
|
||||||
|
|
||||||
|
public IntegrationTestingUtility() {
|
||||||
|
this(HBaseConfiguration.create());
|
||||||
|
}
|
||||||
|
|
||||||
|
public IntegrationTestingUtility(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration that controls whether this utility assumes a running/deployed cluster.
|
||||||
|
* This is different than "hbase.cluster.distributed" since that parameter indicates whether the
|
||||||
|
* cluster is in an actual distributed environment, while this shows that there is a
|
||||||
|
* deployed (distributed or pseudo-distributed) cluster running, and we do not need to
|
||||||
|
* start a mini-cluster for tests.
|
||||||
|
*/
|
||||||
|
public static final String IS_DISTRIBUTED_CLUSTER = "hbase.test.cluster.distributed";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the state of the cluster. It starts a new in-process mini cluster, OR
|
||||||
|
* if we are given an already deployed distributed cluster it initializes the state.
|
||||||
|
* @param numSlaves Number of slaves to start up if we are booting a mini cluster. Otherwise
|
||||||
|
* we check whether this many nodes are available and throw an exception if not.
|
||||||
|
*/
|
||||||
|
public void initializeCluster(int numSlaves) throws Exception {
|
||||||
|
if (isDistributedCluster()) {
|
||||||
|
createDistributedHBaseCluster();
|
||||||
|
checkNodeCount(numSlaves);
|
||||||
|
} else {
|
||||||
|
startMiniCluster(numSlaves);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether we have more than numSlaves nodes. Throws an
|
||||||
|
* exception otherwise.
|
||||||
|
*/
|
||||||
|
public void checkNodeCount(int numSlaves) throws Exception {
|
||||||
|
HBaseCluster cluster = getHBaseClusterInterface();
|
||||||
|
if (cluster.getClusterStatus().getServers().size() < numSlaves) {
|
||||||
|
throw new Exception("Cluster does not have enough nodes:" + numSlaves);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restores the cluster to the initial state if it is a distributed cluster, otherwise, shutdowns the
|
||||||
|
* mini cluster.
|
||||||
|
*/
|
||||||
|
public void restoreCluster() throws IOException {
|
||||||
|
if (isDistributedCluster()) {
|
||||||
|
getHBaseClusterInterface().restoreInitialStatus();
|
||||||
|
} else {
|
||||||
|
getMiniHBaseCluster().shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the configuration property to use a distributed cluster for the integration tests. Test drivers
|
||||||
|
* should use this to enforce cluster deployment.
|
||||||
|
*/
|
||||||
|
public static void setUseDistributedCluster(Configuration conf) {
|
||||||
|
conf.setBoolean(IS_DISTRIBUTED_CLUSTER, true);
|
||||||
|
System.setProperty(IS_DISTRIBUTED_CLUSTER, "true");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether we are interacting with a distributed cluster as opposed to and in-process mini
|
||||||
|
* cluster or a local cluster.
|
||||||
|
* @see IntegrationTestingUtility#setUseDistributedCluster(Configuration)
|
||||||
|
*/
|
||||||
|
private boolean isDistributedCluster() {
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
boolean isDistributedCluster = false;
|
||||||
|
isDistributedCluster = Boolean.parseBoolean(System.getProperty(IS_DISTRIBUTED_CLUSTER, "false"));
|
||||||
|
if (!isDistributedCluster) {
|
||||||
|
isDistributedCluster = conf.getBoolean(IS_DISTRIBUTED_CLUSTER, false);
|
||||||
|
}
|
||||||
|
return isDistributedCluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createDistributedHBaseCluster() throws IOException {
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
ClusterManager clusterManager = new HBaseClusterManager();
|
||||||
|
setHBaseCluster(new DistributedHBaseCluster(conf, clusterManager));
|
||||||
|
getHBaseAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.internal.TextListener;
|
||||||
|
import org.junit.runner.JUnitCore;
|
||||||
|
import org.junit.runner.Result;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class drives the Integration test suite execution. Executes all
|
||||||
|
* tests having @Category(IntegrationTests.class) annotation against an
|
||||||
|
* already deployed distributed cluster.
|
||||||
|
*/
|
||||||
|
public class IntegrationTestsDriver extends AbstractHBaseTool {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
int ret = ToolRunner.run(new IntegrationTestsDriver(), args);
|
||||||
|
System.exit(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addOptions() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processOptions(CommandLine cmd) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns test classes annotated with @Category(IntegrationTests.class)
|
||||||
|
*/
|
||||||
|
private Class<?>[] findIntegrationTestClasses() throws ClassNotFoundException, IOException {
|
||||||
|
TestCheckTestClasses util = new TestCheckTestClasses();
|
||||||
|
List<Class<?>> classes = util.findTestClasses(IntegrationTests.class);
|
||||||
|
return classes.toArray(new Class<?>[classes.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int doWork() throws Exception {
|
||||||
|
|
||||||
|
//this is called from the command line, so we should set to use the distributed cluster
|
||||||
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
|
||||||
|
JUnitCore junit = new JUnitCore();
|
||||||
|
junit.addListener(new TextListener(System.out));
|
||||||
|
Result result = junit.run(findIntegrationTestClasses());
|
||||||
|
|
||||||
|
return result.wasSuccessful() ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,563 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
|
import org.apache.hadoop.hbase.HBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTestDataIngestWithChaosMonkey;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility to injects faults in a running cluster.
|
||||||
|
* <p>
|
||||||
|
* ChaosMonkey defines Action's and Policy's. Actions are sequences of events, like
|
||||||
|
* - Select a random server to kill
|
||||||
|
* - Sleep for 5 sec
|
||||||
|
* - Start the server on the same host
|
||||||
|
* Actions can also be complex events, like rolling restart of all of the servers.
|
||||||
|
* <p>
|
||||||
|
* Policies on the other hand are responsible for executing the actions based on a strategy.
|
||||||
|
* The default policy is to execute a random action every minute based on predefined action
|
||||||
|
* weights. ChaosMonkey executes predefined named policies until it is stopped. More than one
|
||||||
|
* policy can be active at any time.
|
||||||
|
* <p>
|
||||||
|
* Chaos monkey can be run from the command line, or can be invoked from integration tests.
|
||||||
|
* See {@link IntegrationTestDataIngestWithChaosMonkey} or other integration tests that use
|
||||||
|
* chaos monkey for code examples.
|
||||||
|
* <p>
|
||||||
|
* ChaosMonkey class is indeed inspired by the Netflix's same-named tool:
|
||||||
|
* http://techblog.netflix.com/2012/07/chaos-monkey-released-into-wild.html
|
||||||
|
*/
|
||||||
|
public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(ChaosMonkey.class);
|
||||||
|
|
||||||
|
private static final long ONE_SEC = 1000;
|
||||||
|
private static final long FIVE_SEC = 5 * ONE_SEC;
|
||||||
|
private static final long ONE_MIN = 60 * ONE_SEC;
|
||||||
|
private static final long TIMEOUT = ONE_MIN;
|
||||||
|
|
||||||
|
final IntegrationTestingUtility util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a new ChaosMonkey
|
||||||
|
* @param util the HBaseIntegrationTestingUtility already configured
|
||||||
|
* @param policies names of pre-defined policies to use
|
||||||
|
*/
|
||||||
|
public ChaosMonkey(IntegrationTestingUtility util, String... policies) {
|
||||||
|
this.util = util;
|
||||||
|
setPoliciesByName(policies);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setPoliciesByName(String... policies) {
|
||||||
|
this.policies = new Policy[policies.length];
|
||||||
|
for (int i=0; i < policies.length; i++) {
|
||||||
|
this.policies[i] = NAMED_POLICIES.get(policies[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context for Action's
|
||||||
|
*/
|
||||||
|
private static class ActionContext {
|
||||||
|
private IntegrationTestingUtility util;
|
||||||
|
|
||||||
|
ActionContext(IntegrationTestingUtility util) {
|
||||||
|
this.util = util;
|
||||||
|
}
|
||||||
|
|
||||||
|
IntegrationTestingUtility getHaseIntegrationTestingUtility() {
|
||||||
|
return util;
|
||||||
|
}
|
||||||
|
|
||||||
|
HBaseCluster getHBaseCluster() {
|
||||||
|
return util.getHBaseClusterInterface();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A (possibly mischievous) action that the ChaosMonkey can perform.
|
||||||
|
*/
|
||||||
|
private static class Action {
|
||||||
|
long sleepTime; //how long should we sleep
|
||||||
|
ActionContext context;
|
||||||
|
HBaseCluster cluster;
|
||||||
|
ClusterStatus initialStatus;
|
||||||
|
ServerName[] initialServers;
|
||||||
|
|
||||||
|
public Action(long sleepTime) {
|
||||||
|
this.sleepTime = sleepTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
void init(ActionContext context) throws Exception {
|
||||||
|
this.context = context;
|
||||||
|
cluster = context.getHBaseCluster();
|
||||||
|
initialStatus = cluster.getInitialClusterStatus();
|
||||||
|
Collection<ServerName> regionServers = initialStatus.getServers();
|
||||||
|
initialServers = regionServers.toArray(new ServerName[regionServers.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void perform() throws Exception { };
|
||||||
|
|
||||||
|
/** Returns current region servers */
|
||||||
|
ServerName[] getCurrentServers() throws IOException {
|
||||||
|
Collection<ServerName> regionServers = cluster.getClusterStatus().getServers();
|
||||||
|
return regionServers.toArray(new ServerName[regionServers.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void killMaster(ServerName server) throws IOException {
|
||||||
|
LOG.info("Killing master:" + server);
|
||||||
|
cluster.killMaster(server);
|
||||||
|
cluster.waitForMasterToStop(server, TIMEOUT);
|
||||||
|
LOG.info("Killed master server:" + server);
|
||||||
|
}
|
||||||
|
|
||||||
|
void startMaster(ServerName server) throws IOException {
|
||||||
|
LOG.info("Starting master:" + server.getHostname());
|
||||||
|
cluster.startMaster(server.getHostname());
|
||||||
|
cluster.waitForActiveAndReadyMaster(TIMEOUT);
|
||||||
|
LOG.info("Started master: " + server);
|
||||||
|
}
|
||||||
|
|
||||||
|
void restartMaster(ServerName server, long sleepTime) throws IOException {
|
||||||
|
killMaster(server);
|
||||||
|
sleep(sleepTime);
|
||||||
|
startMaster(server);
|
||||||
|
}
|
||||||
|
|
||||||
|
void killRs(ServerName server) throws IOException {
|
||||||
|
LOG.info("Killing region server:" + server);
|
||||||
|
cluster.killRegionServer(server);
|
||||||
|
cluster.waitForRegionServerToStop(server, TIMEOUT);
|
||||||
|
LOG.info("Killed region server:" + server + ". Reported num of rs:"
|
||||||
|
+ cluster.getClusterStatus().getServersSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
void startRs(ServerName server) throws IOException {
|
||||||
|
LOG.info("Starting region server:" + server.getHostname());
|
||||||
|
cluster.startRegionServer(server.getHostname());
|
||||||
|
cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
|
||||||
|
LOG.info("Started region server:" + server + ". Reported num of rs:"
|
||||||
|
+ cluster.getClusterStatus().getServersSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
void sleep(long sleepTime) {
|
||||||
|
LOG.info("Sleeping for:" + sleepTime);
|
||||||
|
Threads.sleep(sleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
void restartRs(ServerName server, long sleepTime) throws IOException {
|
||||||
|
killRs(server);
|
||||||
|
sleep(sleepTime);
|
||||||
|
startRs(server);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RestartActiveMaster extends Action {
|
||||||
|
public RestartActiveMaster(long sleepTime) {
|
||||||
|
super(sleepTime);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
void perform() throws Exception {
|
||||||
|
LOG.info("Performing action: Restart active master");
|
||||||
|
|
||||||
|
ServerName master = cluster.getClusterStatus().getMaster();
|
||||||
|
restartMaster(master, sleepTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RestartRandomRs extends Action {
|
||||||
|
public RestartRandomRs(long sleepTime) {
|
||||||
|
super(sleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void init(ActionContext context) throws Exception {
|
||||||
|
super.init(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws Exception {
|
||||||
|
LOG.info("Performing action: Restart random region server");
|
||||||
|
ServerName server = selectRandomItem(getCurrentServers());
|
||||||
|
|
||||||
|
restartRs(server, sleepTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RestartRsHoldingMeta extends RestartRandomRs {
|
||||||
|
public RestartRsHoldingMeta(long sleepTime) {
|
||||||
|
super(sleepTime);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
void perform() throws Exception {
|
||||||
|
LOG.info("Performing action: Restart region server holding META");
|
||||||
|
ServerName server = cluster.getServerHoldingMeta();
|
||||||
|
if (server == null) {
|
||||||
|
LOG.warn("No server is holding .META. right now.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
restartRs(server, sleepTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RestartRsHoldingRoot extends RestartRandomRs {
|
||||||
|
public RestartRsHoldingRoot(long sleepTime) {
|
||||||
|
super(sleepTime);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
void perform() throws Exception {
|
||||||
|
LOG.info("Performing action: Restart region server holding ROOT");
|
||||||
|
ServerName server = cluster.getServerHoldingMeta();
|
||||||
|
if (server == null) {
|
||||||
|
LOG.warn("No server is holding -ROOT- right now.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
restartRs(server, sleepTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restarts a ratio of the running regionservers at the same time
|
||||||
|
*/
|
||||||
|
private static class BatchRestartRs extends Action {
|
||||||
|
float ratio; //ratio of regionservers to restart
|
||||||
|
|
||||||
|
public BatchRestartRs(long sleepTime, float ratio) {
|
||||||
|
super(sleepTime);
|
||||||
|
this.ratio = ratio;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void init(ActionContext context) throws Exception {
|
||||||
|
super.init(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws Exception {
|
||||||
|
LOG.info(String.format("Performing action: Batch restarting %d%% of region servers",
|
||||||
|
(int)(ratio * 100)));
|
||||||
|
List<ServerName> selectedServers = selectRandomItems(getCurrentServers(), ratio);
|
||||||
|
|
||||||
|
for (ServerName server : selectedServers) {
|
||||||
|
LOG.info("Killing region server:" + server);
|
||||||
|
cluster.killRegionServer(server);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ServerName server : selectedServers) {
|
||||||
|
cluster.waitForRegionServerToStop(server, TIMEOUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Killed " + selectedServers.size() + " region servers. Reported num of rs:"
|
||||||
|
+ cluster.getClusterStatus().getServersSize());
|
||||||
|
|
||||||
|
sleep(sleepTime);
|
||||||
|
|
||||||
|
for (ServerName server : selectedServers) {
|
||||||
|
LOG.info("Starting region server:" + server.getHostname());
|
||||||
|
cluster.startRegionServer(server.getHostname());
|
||||||
|
|
||||||
|
}
|
||||||
|
for (ServerName server : selectedServers) {
|
||||||
|
cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
|
||||||
|
}
|
||||||
|
LOG.info("Started " + selectedServers.size() +" region servers. Reported num of rs:"
|
||||||
|
+ cluster.getClusterStatus().getServersSize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restarts a ratio of the regionservers in a rolling fashion. At each step, either kills a
|
||||||
|
* server, or starts one, sleeping randomly (0-sleepTime) in between steps.
|
||||||
|
*/
|
||||||
|
private static class RollingBatchRestartRs extends BatchRestartRs {
|
||||||
|
public RollingBatchRestartRs(long sleepTime, float ratio) {
|
||||||
|
super(sleepTime, ratio);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws Exception {
|
||||||
|
LOG.info(String.format("Performing action: Rolling batch restarting %d%% of region servers",
|
||||||
|
(int)(ratio * 100)));
|
||||||
|
Random random = new Random();
|
||||||
|
List<ServerName> selectedServers = selectRandomItems(getCurrentServers(), ratio);
|
||||||
|
|
||||||
|
Queue<ServerName> serversToBeKilled = new LinkedList<ServerName>(selectedServers);
|
||||||
|
Queue<ServerName> deadServers = new LinkedList<ServerName>();
|
||||||
|
|
||||||
|
//
|
||||||
|
while (!serversToBeKilled.isEmpty() || !deadServers.isEmpty()) {
|
||||||
|
boolean action = true; //action true = kill server, false = start server
|
||||||
|
|
||||||
|
if (serversToBeKilled.isEmpty() || deadServers.isEmpty()) {
|
||||||
|
action = deadServers.isEmpty();
|
||||||
|
} else {
|
||||||
|
action = random.nextBoolean();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (action) {
|
||||||
|
ServerName server = serversToBeKilled.remove();
|
||||||
|
killRs(server);
|
||||||
|
deadServers.add(server);
|
||||||
|
} else {
|
||||||
|
ServerName server = deadServers.remove();
|
||||||
|
startRs(server);
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(random.nextInt((int)sleepTime));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A context for a Policy
|
||||||
|
*/
|
||||||
|
private static class PolicyContext extends ActionContext {
|
||||||
|
PolicyContext(IntegrationTestingUtility util) {
|
||||||
|
super(util);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A policy to introduce chaos to the cluster
|
||||||
|
*/
|
||||||
|
private static abstract class Policy extends StoppableImplementation implements Runnable {
|
||||||
|
PolicyContext context;
|
||||||
|
public void init(PolicyContext context) throws Exception {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A policy, which picks a random action according to the given weights,
|
||||||
|
* and performs it every configurable period.
|
||||||
|
*/
|
||||||
|
private static class PeriodicRandomActionPolicy extends Policy {
|
||||||
|
private long period;
|
||||||
|
private List<Pair<Action, Integer>> actions;
|
||||||
|
|
||||||
|
PeriodicRandomActionPolicy(long period, List<Pair<Action, Integer>> actions) {
|
||||||
|
this.period = period;
|
||||||
|
this.actions = actions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
//add some jitter
|
||||||
|
int jitter = new Random().nextInt((int)period);
|
||||||
|
LOG.info("Sleeping for " + jitter + " to add jitter");
|
||||||
|
Threads.sleep(jitter);
|
||||||
|
|
||||||
|
while (!isStopped()) {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
Action action = selectWeightedRandomItem(actions);
|
||||||
|
|
||||||
|
try {
|
||||||
|
action.perform();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Exception occured during performing action: "
|
||||||
|
+ StringUtils.stringifyException(ex));
|
||||||
|
}
|
||||||
|
|
||||||
|
long sleepTime = period - (System.currentTimeMillis() - start);
|
||||||
|
if (sleepTime > 0) {
|
||||||
|
LOG.info("Sleeping for:" + sleepTime);
|
||||||
|
Threads.sleep(sleepTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(PolicyContext context) throws Exception {
|
||||||
|
super.init(context);
|
||||||
|
LOG.info("Using ChaosMonkey Policy: " + this.getClass() + ", period:" + period);
|
||||||
|
for (Pair<Action, Integer> action : actions) {
|
||||||
|
action.getFirst().init(this.context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Selects a random item from the given items */
|
||||||
|
static <T> T selectRandomItem(T[] items) {
|
||||||
|
Random random = new Random();
|
||||||
|
return items[random.nextInt(items.length)];
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Selects a random item from the given items with weights*/
|
||||||
|
static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
|
||||||
|
Random random = new Random();
|
||||||
|
int totalWeight = 0;
|
||||||
|
for (Pair<T, Integer> pair : items) {
|
||||||
|
totalWeight += pair.getSecond();
|
||||||
|
}
|
||||||
|
|
||||||
|
int cutoff = random.nextInt(totalWeight);
|
||||||
|
int cummulative = 0;
|
||||||
|
T item = null;
|
||||||
|
|
||||||
|
//warn: O(n)
|
||||||
|
for (int i=0; i<items.size(); i++) {
|
||||||
|
int curWeight = items.get(i).getSecond();
|
||||||
|
if ( cutoff < cummulative + curWeight) {
|
||||||
|
item = items.get(i).getFirst();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cummulative += curWeight;
|
||||||
|
}
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Selects and returns ceil(ratio * items.length) random items from the given array */
|
||||||
|
static <T> List<T> selectRandomItems(T[] items, float ratio) {
|
||||||
|
Random random = new Random();
|
||||||
|
int remaining = (int)Math.ceil(items.length * ratio);
|
||||||
|
|
||||||
|
List<T> selectedItems = new ArrayList<T>(remaining);
|
||||||
|
|
||||||
|
for (int i=0; i<items.length && remaining > 0; i++) {
|
||||||
|
if (random.nextFloat() < ((float)remaining/(items.length-i))) {
|
||||||
|
selectedItems.add(items[i]);
|
||||||
|
remaining--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return selectedItems;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All actions that deal with RS's with the following weights (relative probabilities):
|
||||||
|
* - Restart active master (sleep 5 sec) : 2
|
||||||
|
* - Restart random regionserver (sleep 5 sec) : 2
|
||||||
|
* - Restart random regionserver (sleep 60 sec) : 2
|
||||||
|
* - Restart META regionserver (sleep 5 sec) : 1
|
||||||
|
* - Restart ROOT regionserver (sleep 5 sec) : 1
|
||||||
|
* - Batch restart of 50% of regionservers (sleep 5 sec) : 2
|
||||||
|
* - Rolling restart of 100% of regionservers (sleep 5 sec) : 2
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static final List<Pair<Action, Integer>> ALL_ACTIONS = Lists.newArrayList(
|
||||||
|
new Pair<Action,Integer>(new RestartActiveMaster(FIVE_SEC), 2),
|
||||||
|
new Pair<Action,Integer>(new RestartRandomRs(FIVE_SEC), 2),
|
||||||
|
new Pair<Action,Integer>(new RestartRandomRs(ONE_MIN), 2),
|
||||||
|
new Pair<Action,Integer>(new RestartRsHoldingMeta(FIVE_SEC), 1),
|
||||||
|
new Pair<Action,Integer>(new RestartRsHoldingRoot(FIVE_SEC), 1),
|
||||||
|
new Pair<Action,Integer>(new BatchRestartRs(FIVE_SEC, 0.5f), 2),
|
||||||
|
new Pair<Action,Integer>(new RollingBatchRestartRs(FIVE_SEC, 1.0f), 2)
|
||||||
|
);
|
||||||
|
|
||||||
|
public static final String EVERY_MINUTE_RANDOM_ACTION_POLICY = "EVERY_MINUTE_RANDOM_ACTION_POLICY";
|
||||||
|
|
||||||
|
private Policy[] policies;
|
||||||
|
private Thread[] monkeyThreads;
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
monkeyThreads = new Thread[policies.length];
|
||||||
|
|
||||||
|
for (int i=0; i<policies.length; i++) {
|
||||||
|
policies[i].init(new PolicyContext(this.util));
|
||||||
|
Thread monkeyThread = new Thread(policies[i]);
|
||||||
|
monkeyThread.start();
|
||||||
|
monkeyThreads[i] = monkeyThread;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
for (Policy policy : policies) {
|
||||||
|
policy.stop(why);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return policies[0].isStopped();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for ChaosMonkey to stop.
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public void waitForStop() throws InterruptedException {
|
||||||
|
for (Thread monkeyThread : monkeyThreads) {
|
||||||
|
monkeyThread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Map<String, Policy> NAMED_POLICIES = Maps.newHashMap();
|
||||||
|
static {
|
||||||
|
NAMED_POLICIES.put(EVERY_MINUTE_RANDOM_ACTION_POLICY,
|
||||||
|
new PeriodicRandomActionPolicy(ONE_MIN, ALL_ACTIONS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addOptions() {
|
||||||
|
addOptWithArg("policy", "a named policy defined in ChaosMonkey.java. Possible values: "
|
||||||
|
+ NAMED_POLICIES.keySet());
|
||||||
|
//we can add more options, and make policies more configurable
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processOptions(CommandLine cmd) {
|
||||||
|
String[] policies = cmd.getOptionValues("policy");
|
||||||
|
if (policies != null) {
|
||||||
|
setPoliciesByName(policies);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int doWork() throws Exception {
|
||||||
|
start();
|
||||||
|
waitForStop();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
IntegrationTestingUtility util = new IntegrationTestingUtility(conf);
|
||||||
|
util.initializeCluster(1);
|
||||||
|
|
||||||
|
ChaosMonkey monkey = new ChaosMonkey(util, EVERY_MINUTE_RANDOM_ACTION_POLICY);
|
||||||
|
int ret = ToolRunner.run(conf, monkey, args);
|
||||||
|
System.exit(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -65,7 +65,7 @@ public abstract class AbstractHBaseTool implements Tool {
|
||||||
protected abstract void processOptions(CommandLine cmd);
|
protected abstract void processOptions(CommandLine cmd);
|
||||||
|
|
||||||
/** The "main function" of the tool */
|
/** The "main function" of the tool */
|
||||||
protected abstract void doWork() throws Exception;
|
protected abstract int doWork() throws Exception;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Configuration getConf() {
|
public Configuration getConf() {
|
||||||
|
@ -101,13 +101,14 @@ public abstract class AbstractHBaseTool implements Tool {
|
||||||
|
|
||||||
processOptions(cmd);
|
processOptions(cmd);
|
||||||
|
|
||||||
|
int ret = EXIT_FAILURE;
|
||||||
try {
|
try {
|
||||||
doWork();
|
ret = doWork();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error running command-line tool", e);
|
LOG.error("Error running command-line tool", e);
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
return EXIT_SUCCESS;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean sanityCheckOptions(CommandLine cmd) {
|
private boolean sanityCheckOptions(CommandLine cmd) {
|
||||||
|
|
|
@ -128,7 +128,7 @@ public class Threads {
|
||||||
/**
|
/**
|
||||||
* @param millis How long to sleep for in milliseconds.
|
* @param millis How long to sleep for in milliseconds.
|
||||||
*/
|
*/
|
||||||
public static void sleep(int millis) {
|
public static void sleep(long millis) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(millis);
|
Thread.sleep(millis);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -0,0 +1,280 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||||
|
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class defines methods that can help with managing HBase clusters
|
||||||
|
* from unit tests and system tests. There are 3 types of cluster deployments:
|
||||||
|
* <ul>
|
||||||
|
* <li><b>MiniHBaseCluster:</b> each server is run in the same JVM in separate threads,
|
||||||
|
* used by unit tests</li>
|
||||||
|
* <li><b>DistributedHBaseCluster:</b> the cluster is pre-deployed, system and integration tests can
|
||||||
|
* interact with the cluster. </li>
|
||||||
|
* <li><b>ProcessBasedLocalHBaseCluster:</b> each server is deployed locally but in separate
|
||||||
|
* JVMs. </li>
|
||||||
|
* </ul>
|
||||||
|
* <p>
|
||||||
|
* HBaseCluster unifies the way tests interact with the cluster, so that the same test can
|
||||||
|
* be run against a mini-cluster during unit test execution, or a distributed cluster having
|
||||||
|
* tens/hundreds of nodes during execution of integration tests.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* HBaseCluster exposes client-side public interfaces to tests, so that tests does not assume
|
||||||
|
* running in a particular mode. Not all the tests are suitable to be run on an actual cluster,
|
||||||
|
* and some tests will still need to mock stuff and introspect internal state. For those use
|
||||||
|
* cases from unit tests, or if more control is needed, you can use the subclasses directly.
|
||||||
|
* In that sense, this class does not abstract away <strong>every</strong> interface that
|
||||||
|
* MiniHBaseCluster or DistributedHBaseCluster provide.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class HBaseCluster implements Closeable, Configurable {
|
||||||
|
static final Log LOG = LogFactory.getLog(HBaseCluster.class.getName());
|
||||||
|
protected Configuration conf;
|
||||||
|
|
||||||
|
/** the status of the cluster before we begin */
|
||||||
|
protected ClusterStatus initialClusterStatus;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct an HBaseCluster
|
||||||
|
* @param conf Configuration to be used for cluster
|
||||||
|
*/
|
||||||
|
public HBaseCluster(Configuration conf) {
|
||||||
|
setConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ClusterStatus for this HBase cluster.
|
||||||
|
* @see #getInitialClusterStatus()
|
||||||
|
*/
|
||||||
|
public abstract ClusterStatus getClusterStatus() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ClusterStatus for this HBase cluster as observed at the
|
||||||
|
* starting of the HBaseCluster
|
||||||
|
*/
|
||||||
|
public ClusterStatus getInitialClusterStatus() throws IOException {
|
||||||
|
return initialClusterStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an {@link MasterAdminProtocol} to the active master
|
||||||
|
*/
|
||||||
|
public abstract MasterAdminProtocol getMasterAdmin()
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an {@link MasterMonitorProtocol} to the active master
|
||||||
|
*/
|
||||||
|
public abstract MasterMonitorProtocol getMasterMonitor()
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an AdminProtocol interface to the regionserver
|
||||||
|
*/
|
||||||
|
public abstract AdminProtocol getAdminProtocol(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ClientProtocol interface to the regionserver
|
||||||
|
*/
|
||||||
|
public abstract ClientProtocol getClientProtocol(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts a new region server on the given hostname or if this is a mini/local cluster,
|
||||||
|
* starts a region server locally.
|
||||||
|
* @param hostname the hostname to start the regionserver on
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void startRegionServer(String hostname) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kills the region server process if this is a distributed cluster, otherwise
|
||||||
|
* this causes the region server to exit doing basic clean up only.
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void killRegionServer(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the given region server, by attempting a gradual stop.
|
||||||
|
* @return whether the operation finished with success
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void stopRegionServer(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the specified region server to join the cluster
|
||||||
|
* @return whether the operation finished with success
|
||||||
|
* @throws IOException if something goes wrong or timeout occurs
|
||||||
|
*/
|
||||||
|
public void waitForRegionServerToStart(String hostname, long timeout)
|
||||||
|
throws IOException {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
while ((System.currentTimeMillis() - start) < timeout) {
|
||||||
|
for (ServerName server : getClusterStatus().getServers()) {
|
||||||
|
if (server.getHostname().equals(hostname)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Threads.sleep(100);
|
||||||
|
}
|
||||||
|
throw new IOException("did timeout waiting for region server to start:" + hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the specified region server to stop the thread / process.
|
||||||
|
* @return whether the operation finished with success
|
||||||
|
* @throws IOException if something goes wrong or timeout occurs
|
||||||
|
*/
|
||||||
|
public abstract void waitForRegionServerToStop(ServerName serverName, long timeout)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts a new master on the given hostname or if this is a mini/local cluster,
|
||||||
|
* starts a master locally.
|
||||||
|
* @param hostname the hostname to start the master on
|
||||||
|
* @return whether the operation finished with success
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void startMaster(String hostname) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kills the master process if this is a distributed cluster, otherwise,
|
||||||
|
* this causes master to exit doing basic clean up only.
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void killMaster(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the given master, by attempting a gradual stop.
|
||||||
|
* @throws IOException if something goes wrong
|
||||||
|
*/
|
||||||
|
public abstract void stopMaster(ServerName serverName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the specified master to stop the thread / process.
|
||||||
|
* @throws IOException if something goes wrong or timeout occurs
|
||||||
|
*/
|
||||||
|
public abstract void waitForMasterToStop(ServerName serverName, long timeout)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until there is an active master and that master has completed
|
||||||
|
* initialization.
|
||||||
|
*
|
||||||
|
* @return true if an active master becomes available. false if there are no
|
||||||
|
* masters left.
|
||||||
|
* @throws IOException if something goes wrong or timeout occurs
|
||||||
|
*/
|
||||||
|
public boolean waitForActiveAndReadyMaster()
|
||||||
|
throws IOException {
|
||||||
|
return waitForActiveAndReadyMaster(Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until there is an active master and that master has completed
|
||||||
|
* initialization.
|
||||||
|
* @param timeout the timeout limit in ms
|
||||||
|
* @return true if an active master becomes available. false if there are no
|
||||||
|
* masters left.
|
||||||
|
*/
|
||||||
|
public abstract boolean waitForActiveAndReadyMaster(long timeout)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for HBase Cluster to shut down.
|
||||||
|
*/
|
||||||
|
public abstract void waitUntilShutDown() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shut down the HBase cluster
|
||||||
|
*/
|
||||||
|
public abstract void shutdown() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restores the cluster to it's initial state if this is a real cluster,
|
||||||
|
* otherwise does nothing.
|
||||||
|
*/
|
||||||
|
public void restoreInitialStatus() throws IOException {
|
||||||
|
restoreClusterStatus(getInitialClusterStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restores the cluster to given state if this is a real cluster,
|
||||||
|
* otherwise does nothing.
|
||||||
|
*/
|
||||||
|
public void restoreClusterStatus(ClusterStatus desiredStatus) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the ServerName of region server serving ROOT region
|
||||||
|
*/
|
||||||
|
public ServerName getServerHoldingRoot() throws IOException {
|
||||||
|
return getServerHoldingRegion(HRegionInfo.ROOT_REGIONINFO.getRegionName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the ServerName of region server serving the first META region
|
||||||
|
*/
|
||||||
|
public ServerName getServerHoldingMeta() throws IOException {
|
||||||
|
return getServerHoldingRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the ServerName of region server serving the specified region
|
||||||
|
* @param regionName Name of the region in bytes
|
||||||
|
* @return ServerName that hosts the region or null
|
||||||
|
*/
|
||||||
|
public abstract ServerName getServerHoldingRegion(byte[] regionName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether we are interacting with a distributed cluster as opposed to an
|
||||||
|
* in-process mini/local cluster.
|
||||||
|
*/
|
||||||
|
public boolean isDistributedCluster() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes all the resources held open for this cluster. Note that this call does not shutdown
|
||||||
|
* the cluster.
|
||||||
|
* @see #shutdown()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public abstract void close() throws IOException;
|
||||||
|
}
|
|
@ -102,7 +102,9 @@ import org.apache.zookeeper.ZooKeeper;
|
||||||
* old HBaseTestCase and HBaseClusterTestCase functionality.
|
* old HBaseTestCase and HBaseClusterTestCase functionality.
|
||||||
* Create an instance and keep it around testing HBase. This class is
|
* Create an instance and keep it around testing HBase. This class is
|
||||||
* meant to be your one-stop shop for anything you might need testing. Manages
|
* meant to be your one-stop shop for anything you might need testing. Manages
|
||||||
* one cluster at a time only.
|
* one cluster at a time only. Managed cluster can be an in-process
|
||||||
|
* {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}.
|
||||||
|
* Not all methods work with the real cluster.
|
||||||
* Depends on log4j being on classpath and
|
* Depends on log4j being on classpath and
|
||||||
* hbase-site.xml for logging and test-run configuration. It does not set
|
* hbase-site.xml for logging and test-run configuration. It does not set
|
||||||
* logging levels nor make changes to configuration parameters.
|
* logging levels nor make changes to configuration parameters.
|
||||||
|
@ -127,7 +129,7 @@ public class HBaseTestingUtility {
|
||||||
private boolean passedZkCluster = false;
|
private boolean passedZkCluster = false;
|
||||||
private MiniDFSCluster dfsCluster = null;
|
private MiniDFSCluster dfsCluster = null;
|
||||||
|
|
||||||
private MiniHBaseCluster hbaseCluster = null;
|
private HBaseCluster hbaseCluster = null;
|
||||||
private MiniMRCluster mrCluster = null;
|
private MiniMRCluster mrCluster = null;
|
||||||
|
|
||||||
/** If there is a mini cluster running for this testing utility instance. */
|
/** If there is a mini cluster running for this testing utility instance. */
|
||||||
|
@ -230,6 +232,10 @@ public class HBaseTestingUtility {
|
||||||
return this.conf;
|
return this.conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setHBaseCluster(HBaseCluster hbaseCluster) {
|
||||||
|
this.hbaseCluster = hbaseCluster;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Where to write test data on local filesystem; usually
|
* @return Where to write test data on local filesystem; usually
|
||||||
* {@link #DEFAULT_BASE_TEST_DIRECTORY}
|
* {@link #DEFAULT_BASE_TEST_DIRECTORY}
|
||||||
|
@ -697,7 +703,7 @@ public class HBaseTestingUtility {
|
||||||
|
|
||||||
getHBaseAdmin(); // create immediately the hbaseAdmin
|
getHBaseAdmin(); // create immediately the hbaseAdmin
|
||||||
LOG.info("Minicluster is up");
|
LOG.info("Minicluster is up");
|
||||||
return this.hbaseCluster;
|
return (MiniHBaseCluster)this.hbaseCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -725,7 +731,11 @@ public class HBaseTestingUtility {
|
||||||
* @see #startMiniCluster()
|
* @see #startMiniCluster()
|
||||||
*/
|
*/
|
||||||
public MiniHBaseCluster getMiniHBaseCluster() {
|
public MiniHBaseCluster getMiniHBaseCluster() {
|
||||||
return this.hbaseCluster;
|
if (this.hbaseCluster instanceof MiniHBaseCluster) {
|
||||||
|
return (MiniHBaseCluster)this.hbaseCluster;
|
||||||
|
}
|
||||||
|
throw new RuntimeException(hbaseCluster + " not an instance of " +
|
||||||
|
MiniHBaseCluster.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -764,7 +774,7 @@ public class HBaseTestingUtility {
|
||||||
if (this.hbaseCluster != null) {
|
if (this.hbaseCluster != null) {
|
||||||
this.hbaseCluster.shutdown();
|
this.hbaseCluster.shutdown();
|
||||||
// Wait till hbase is down before going on to shutdown zk.
|
// Wait till hbase is down before going on to shutdown zk.
|
||||||
this.hbaseCluster.join();
|
this.hbaseCluster.waitUntilShutDown();
|
||||||
this.hbaseCluster = null;
|
this.hbaseCluster = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -802,7 +812,7 @@ public class HBaseTestingUtility {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
this.hbaseCluster.flushcache();
|
getMiniHBaseCluster().flushcache();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -810,7 +820,7 @@ public class HBaseTestingUtility {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void flush(byte [] tableName) throws IOException {
|
public void flush(byte [] tableName) throws IOException {
|
||||||
this.hbaseCluster.flushcache(tableName);
|
getMiniHBaseCluster().flushcache(tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -818,7 +828,7 @@ public class HBaseTestingUtility {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void compact(boolean major) throws IOException {
|
public void compact(boolean major) throws IOException {
|
||||||
this.hbaseCluster.compact(major);
|
getMiniHBaseCluster().compact(major);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -826,7 +836,7 @@ public class HBaseTestingUtility {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void compact(byte [] tableName, boolean major) throws IOException {
|
public void compact(byte [] tableName, boolean major) throws IOException {
|
||||||
this.hbaseCluster.compact(tableName, major);
|
getMiniHBaseCluster().compact(tableName, major);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1242,9 +1252,10 @@ public class HBaseTestingUtility {
|
||||||
HConnection conn = table.getConnection();
|
HConnection conn = table.getConnection();
|
||||||
conn.clearRegionCache();
|
conn.clearRegionCache();
|
||||||
// assign all the new regions IF table is enabled.
|
// assign all the new regions IF table is enabled.
|
||||||
if (getHBaseAdmin().isTableEnabled(table.getTableName())) {
|
HBaseAdmin admin = getHBaseAdmin();
|
||||||
|
if (admin.isTableEnabled(table.getTableName())) {
|
||||||
for(HRegionInfo hri : newRegions) {
|
for(HRegionInfo hri : newRegions) {
|
||||||
hbaseCluster.getMaster().assignRegion(hri);
|
admin.assign(hri.getRegionName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1351,8 +1362,8 @@ public class HBaseTestingUtility {
|
||||||
Bytes.toString(tableName));
|
Bytes.toString(tableName));
|
||||||
byte [] firstrow = metaRows.get(0);
|
byte [] firstrow = metaRows.get(0);
|
||||||
LOG.debug("FirstRow=" + Bytes.toString(firstrow));
|
LOG.debug("FirstRow=" + Bytes.toString(firstrow));
|
||||||
int index = hbaseCluster.getServerWith(firstrow);
|
int index = getMiniHBaseCluster().getServerWith(firstrow);
|
||||||
return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
|
return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1468,7 +1479,7 @@ public class HBaseTestingUtility {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public void expireMasterSession() throws Exception {
|
public void expireMasterSession() throws Exception {
|
||||||
HMaster master = hbaseCluster.getMaster();
|
HMaster master = getMiniHBaseCluster().getMaster();
|
||||||
expireSession(master.getZooKeeper(), false);
|
expireSession(master.getZooKeeper(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1478,7 +1489,7 @@ public class HBaseTestingUtility {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public void expireRegionServerSession(int index) throws Exception {
|
public void expireRegionServerSession(int index) throws Exception {
|
||||||
HRegionServer rs = hbaseCluster.getRegionServer(index);
|
HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
|
||||||
expireSession(rs.getZooKeeper(), false);
|
expireSession(rs.getZooKeeper(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1542,13 +1553,27 @@ public class HBaseTestingUtility {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the HBase cluster.
|
* Get the Mini HBase cluster.
|
||||||
*
|
*
|
||||||
* @return hbase cluster
|
* @return hbase cluster
|
||||||
|
* @see #getHBaseClusterInterface()
|
||||||
*/
|
*/
|
||||||
public MiniHBaseCluster getHBaseCluster() {
|
public MiniHBaseCluster getHBaseCluster() {
|
||||||
|
return getMiniHBaseCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the HBaseCluster instance.
|
||||||
|
* <p>Returned object can be any of the subclasses of HBaseCluster, and the
|
||||||
|
* tests referring this should not assume that the cluster is a mini cluster or a
|
||||||
|
* distributed one. If the test only works on a mini cluster, then specific
|
||||||
|
* method {@link #getMiniHBaseCluster()} can be used instead w/o the
|
||||||
|
* need to type-cast.
|
||||||
|
*/
|
||||||
|
public HBaseCluster getHBaseClusterInterface() {
|
||||||
|
//implementation note: we should rename this method as #getHBaseCluster(),
|
||||||
|
//but this would require refactoring 90+ calls.
|
||||||
return hbaseCluster;
|
return hbaseCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1739,7 +1764,7 @@ public class HBaseTestingUtility {
|
||||||
public boolean ensureSomeRegionServersAvailable(final int num)
|
public boolean ensureSomeRegionServersAvailable(final int num)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean startedServer = false;
|
boolean startedServer = false;
|
||||||
|
MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
|
||||||
for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
|
for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
|
||||||
LOG.info("Started new server=" + hbaseCluster.startRegionServer());
|
LOG.info("Started new server=" + hbaseCluster.startRegionServer());
|
||||||
startedServer = true;
|
startedServer = true;
|
||||||
|
@ -1762,12 +1787,12 @@ public class HBaseTestingUtility {
|
||||||
boolean startedServer = ensureSomeRegionServersAvailable(num);
|
boolean startedServer = ensureSomeRegionServersAvailable(num);
|
||||||
|
|
||||||
for (JVMClusterUtil.RegionServerThread rst :
|
for (JVMClusterUtil.RegionServerThread rst :
|
||||||
hbaseCluster.getRegionServerThreads()) {
|
getMiniHBaseCluster().getRegionServerThreads()) {
|
||||||
|
|
||||||
HRegionServer hrs = rst.getRegionServer();
|
HRegionServer hrs = rst.getRegionServer();
|
||||||
if (hrs.isStopping() || hrs.isStopped()) {
|
if (hrs.isStopping() || hrs.isStopped()) {
|
||||||
LOG.info("A region server is stopped or stopping:"+hrs);
|
LOG.info("A region server is stopped or stopping:"+hrs);
|
||||||
LOG.info("Started new server=" + hbaseCluster.startRegionServer());
|
LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
|
||||||
startedServer = true;
|
startedServer = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2039,7 +2064,7 @@ public class HBaseTestingUtility {
|
||||||
numRegions);
|
numRegions);
|
||||||
|
|
||||||
if (hbaseCluster != null) {
|
if (hbaseCluster != null) {
|
||||||
hbaseCluster.flushcache(HConstants.META_TABLE_NAME);
|
getMiniHBaseCluster().flushcache(HConstants.META_TABLE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
|
for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
|
||||||
|
@ -2076,7 +2101,7 @@ public class HBaseTestingUtility {
|
||||||
LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
|
LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
|
||||||
table.flushCommits();
|
table.flushCommits();
|
||||||
if (hbaseCluster != null) {
|
if (hbaseCluster != null) {
|
||||||
hbaseCluster.flushcache(tableNameBytes);
|
getMiniHBaseCluster().flushcache(tableNameBytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tag a test as 'integration/system' test, meaning that the test class has the following
|
||||||
|
* characteristics: <ul>
|
||||||
|
* <li> Possibly takes hours to complete</li>
|
||||||
|
* <li> Can be run on a mini cluster or an actual cluster</li>
|
||||||
|
* <li> Can make changes to the given cluster (starting stopping daemons, etc)</li>
|
||||||
|
* <li> Should not be run in parallel of other integration tests</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* Integration / System tests should have a class name starting with "IntegrationTest", and
|
||||||
|
* should be annotated with @Category(IntegrationTests.class). Integration tests can be run
|
||||||
|
* using the IntegrationTestsDriver class or from mvn verify.
|
||||||
|
*
|
||||||
|
* @see SmallTests
|
||||||
|
* @see MediumTests
|
||||||
|
* @see LargeTests
|
||||||
|
*/
|
||||||
|
public interface IntegrationTests {
|
||||||
|
}
|
|
@ -32,6 +32,7 @@ package org.apache.hadoop.hbase;
|
||||||
*
|
*
|
||||||
* @see SmallTests
|
* @see SmallTests
|
||||||
* @see MediumTests
|
* @see MediumTests
|
||||||
|
* @see IntegrationTests
|
||||||
*/
|
*/
|
||||||
public interface LargeTests {
|
public interface LargeTests {
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ package org.apache.hadoop.hbase;
|
||||||
*
|
*
|
||||||
* @see SmallTests
|
* @see SmallTests
|
||||||
* @see LargeTests
|
* @see LargeTests
|
||||||
|
* @see IntegrationTests
|
||||||
*/
|
*/
|
||||||
public interface MediumTests {
|
public interface MediumTests {
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||||
|
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
||||||
|
@ -36,11 +38,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.io.MapWritable;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class creates a single process HBase cluster.
|
* This class creates a single process HBase cluster.
|
||||||
|
@ -50,9 +51,8 @@ import org.apache.hadoop.io.MapWritable;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class MiniHBaseCluster {
|
public class MiniHBaseCluster extends HBaseCluster {
|
||||||
static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
|
static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
|
||||||
private Configuration conf;
|
|
||||||
public LocalHBaseCluster hbaseCluster;
|
public LocalHBaseCluster hbaseCluster;
|
||||||
private static int index;
|
private static int index;
|
||||||
|
|
||||||
|
@ -77,18 +77,17 @@ public class MiniHBaseCluster {
|
||||||
public MiniHBaseCluster(Configuration conf, int numMasters,
|
public MiniHBaseCluster(Configuration conf, int numMasters,
|
||||||
int numRegionServers)
|
int numRegionServers)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
this.conf = conf;
|
this(conf, numMasters, numRegionServers, null, null);
|
||||||
conf.set(HConstants.MASTER_PORT, "0");
|
|
||||||
init(numMasters, numRegionServers, null, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
|
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
|
||||||
Class<? extends HMaster> masterClass,
|
Class<? extends HMaster> masterClass,
|
||||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
this.conf = conf;
|
super(conf);
|
||||||
conf.set(HConstants.MASTER_PORT, "0");
|
conf.set(HConstants.MASTER_PORT, "0");
|
||||||
init(numMasters, numRegionServers, masterClass, regionserverClass);
|
init(numMasters, numRegionServers, masterClass, regionserverClass);
|
||||||
|
this.initialClusterStatus = getClusterStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Configuration getConfiguration() {
|
public Configuration getConfiguration() {
|
||||||
|
@ -229,6 +228,54 @@ public class MiniHBaseCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startRegionServer(String hostname) throws IOException {
|
||||||
|
this.startRegionServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killRegionServer(ServerName serverName) throws IOException {
|
||||||
|
HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
|
||||||
|
if (server instanceof MiniHBaseClusterRegionServer) {
|
||||||
|
LOG.info("Killing " + server.toString());
|
||||||
|
((MiniHBaseClusterRegionServer) server).kill();
|
||||||
|
} else {
|
||||||
|
abortRegionServer(getRegionServerIndex(serverName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopRegionServer(ServerName serverName) throws IOException {
|
||||||
|
stopRegionServer(getRegionServerIndex(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
|
||||||
|
//ignore timeout for now
|
||||||
|
waitOnRegionServer(getRegionServerIndex(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startMaster(String hostname) throws IOException {
|
||||||
|
this.startMaster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killMaster(ServerName serverName) throws IOException {
|
||||||
|
abortMaster(getMasterIndex(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopMaster(ServerName serverName) throws IOException {
|
||||||
|
stopMaster(getMasterIndex(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
|
||||||
|
//ignore timeout for now
|
||||||
|
waitOnMaster(getMasterIndex(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a region server thread running
|
* Starts a region server thread running
|
||||||
*
|
*
|
||||||
|
@ -324,6 +371,16 @@ public class MiniHBaseCluster {
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MasterAdminProtocol getMasterAdmin() {
|
||||||
|
return this.hbaseCluster.getActiveMaster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MasterMonitorProtocol getMasterMonitor() {
|
||||||
|
return this.hbaseCluster.getActiveMaster();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the current active master, if available.
|
* Returns the current active master, if available.
|
||||||
* @return the active HMaster, null if none is active.
|
* @return the active HMaster, null if none is active.
|
||||||
|
@ -398,15 +455,18 @@ public class MiniHBaseCluster {
|
||||||
* masters left.
|
* masters left.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public boolean waitForActiveAndReadyMaster() throws InterruptedException {
|
public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
|
||||||
List<JVMClusterUtil.MasterThread> mts;
|
List<JVMClusterUtil.MasterThread> mts;
|
||||||
while (!(mts = getMasterThreads()).isEmpty()) {
|
long start = System.currentTimeMillis();
|
||||||
|
while (!(mts = getMasterThreads()).isEmpty()
|
||||||
|
&& (System.currentTimeMillis() - start) < timeout) {
|
||||||
for (JVMClusterUtil.MasterThread mt : mts) {
|
for (JVMClusterUtil.MasterThread mt : mts) {
|
||||||
if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
|
if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
|
||||||
|
Threads.sleep(100);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -443,6 +503,16 @@ public class MiniHBaseCluster {
|
||||||
HConnectionManager.deleteAllConnections(false);
|
HConnectionManager.deleteAllConnections(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClusterStatus getClusterStatus() throws IOException {
|
||||||
|
HMaster master = getMaster();
|
||||||
|
return master == null ? null : master.getClusterStatus();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call flushCache on all regions on all participating regionservers.
|
* Call flushCache on all regions on all participating regionservers.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -565,6 +635,15 @@ public class MiniHBaseCluster {
|
||||||
return index;
|
return index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
|
||||||
|
int index = getServerWith(regionName);
|
||||||
|
if (index < 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return getRegionServer(index).getServerName();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Counts the total numbers of regions being served by the currently online
|
* Counts the total numbers of regions being served by the currently online
|
||||||
* region servers by asking each how many regions they have. Does not look
|
* region servers by asking each how many regions they have. Does not look
|
||||||
|
@ -591,4 +670,40 @@ public class MiniHBaseCluster {
|
||||||
masterThread.getMaster().abort("killAll", new Throwable());
|
masterThread.getMaster().abort("killAll", new Throwable());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitUntilShutDown() {
|
||||||
|
this.hbaseCluster.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getRegionServerIndex(ServerName serverName) {
|
||||||
|
//we have a small number of region servers, this should be fine for now.
|
||||||
|
List<RegionServerThread> servers = getRegionServerThreads();
|
||||||
|
for (int i=0; i < servers.size(); i++) {
|
||||||
|
if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getMasterIndex(ServerName serverName) {
|
||||||
|
List<MasterThread> masters = getMasterThreads();
|
||||||
|
for (int i = 0; i < masters.size(); i++) {
|
||||||
|
if (masters.get(i).getMaster().getServerName().equals(serverName)) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AdminProtocol getAdminProtocol(ServerName serverName) throws IOException {
|
||||||
|
return getRegionServer(getRegionServerIndex(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientProtocol getClientProtocol(ServerName serverName) throws IOException {
|
||||||
|
return getRegionServer(getRegionServerIndex(serverName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ package org.apache.hadoop.hbase;
|
||||||
*
|
*
|
||||||
* @see MediumTests
|
* @see MediumTests
|
||||||
* @see LargeTests
|
* @see LargeTests
|
||||||
|
* @see IntegrationTests
|
||||||
*/
|
*/
|
||||||
public interface SmallTests {
|
public interface SmallTests {
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import org.junit.Test;
|
import static junit.framework.Assert.assertNotNull;
|
||||||
import org.junit.experimental.categories.Category;
|
import static org.junit.Assert.assertTrue;
|
||||||
import org.junit.runners.Suite;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Modifier;
|
import java.lang.reflect.Modifier;
|
||||||
|
@ -31,8 +31,9 @@ import java.util.ArrayList;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static junit.framework.Assert.assertNotNull;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.assertTrue;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runners.Suite;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,8 +42,21 @@ import static org.junit.Assert.assertTrue;
|
||||||
@Category(SmallTests.class)
|
@Category(SmallTests.class)
|
||||||
public class TestCheckTestClasses {
|
public class TestCheckTestClasses {
|
||||||
|
|
||||||
|
private FileFilter TEST_CLASS_FILE_FILTER = new FileFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(File file) {
|
||||||
|
return file.isDirectory() || isTestClassFile(file);
|
||||||
|
|
||||||
|
}
|
||||||
|
private boolean isTestClassFile(File file) {
|
||||||
|
String fileName = file.getName();
|
||||||
|
return fileName.endsWith(".class")
|
||||||
|
&& (fileName.startsWith("Test") || fileName.startsWith("IntegrationTest"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throws an assertion if we find a test class without category (small/medium/large).
|
* Throws an assertion if we find a test class without category (small/medium/large/integration).
|
||||||
* List all the test classes without category in the assertion message.
|
* List all the test classes without category in the assertion message.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -50,7 +64,7 @@ public class TestCheckTestClasses {
|
||||||
List<Class<?>> badClasses = new java.util.ArrayList<Class<?>>();
|
List<Class<?>> badClasses = new java.util.ArrayList<Class<?>>();
|
||||||
|
|
||||||
for (Class<?> c : findTestClasses()) {
|
for (Class<?> c : findTestClasses()) {
|
||||||
if (!existCategoryAnnotation(c)) {
|
if (!existCategoryAnnotation(c, null)) {
|
||||||
badClasses.add(c);
|
badClasses.add(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,9 +73,22 @@ public class TestCheckTestClasses {
|
||||||
+ badClasses, badClasses.isEmpty());
|
+ badClasses, badClasses.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Returns whether the class has @Category annotation having the xface value.
|
||||||
|
*/
|
||||||
|
private boolean existCategoryAnnotation(Class<?> c, Class<?> xface) {
|
||||||
|
Category category = c.getAnnotation(Category.class);
|
||||||
|
|
||||||
private boolean existCategoryAnnotation(Class<?> c) {
|
if (category != null) {
|
||||||
return (c.getAnnotation(Category.class) != null);
|
if (xface == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
for (Class<?> cc : category.value()) {
|
||||||
|
if (cc.equals(xface)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -88,6 +115,19 @@ public class TestCheckTestClasses {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds test classes which are annotated with @Category having xface value
|
||||||
|
* @param xface the @Category value
|
||||||
|
*/
|
||||||
|
public List<Class<?>> findTestClasses(Class<?> xface) throws ClassNotFoundException, IOException {
|
||||||
|
List<Class<?>> classes = new ArrayList<Class<?>>();
|
||||||
|
for (Class<?> c : findTestClasses()) {
|
||||||
|
if (existCategoryAnnotation(c, xface)) {
|
||||||
|
classes.add(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return classes;
|
||||||
|
}
|
||||||
|
|
||||||
private List<Class<?>> findTestClasses() throws ClassNotFoundException, IOException {
|
private List<Class<?>> findTestClasses() throws ClassNotFoundException, IOException {
|
||||||
final String packageName = "org.apache.hadoop.hbase";
|
final String packageName = "org.apache.hadoop.hbase";
|
||||||
|
@ -117,14 +157,14 @@ public class TestCheckTestClasses {
|
||||||
return classes;
|
return classes;
|
||||||
}
|
}
|
||||||
|
|
||||||
File[] files = baseDirectory.listFiles();
|
File[] files = baseDirectory.listFiles(TEST_CLASS_FILE_FILTER);
|
||||||
assertNotNull(files);
|
assertNotNull(files);
|
||||||
|
|
||||||
for (File file : files) {
|
for (File file : files) {
|
||||||
final String fileName = file.getName();
|
final String fileName = file.getName();
|
||||||
if (file.isDirectory()) {
|
if (file.isDirectory()) {
|
||||||
classes.addAll(findTestClasses(file, packageName + "." + fileName));
|
classes.addAll(findTestClasses(file, packageName + "." + fileName));
|
||||||
} else if (fileName.endsWith(".class") && fileName.startsWith("Test")) {
|
} else {
|
||||||
Class<?> c = Class.forName(
|
Class<?> c = Class.forName(
|
||||||
packageName + '.' + fileName.substring(0, fileName.length() - 6),
|
packageName + '.' + fileName.substring(0, fileName.length() - 6),
|
||||||
false,
|
false,
|
||||||
|
|
|
@ -34,7 +34,19 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.RegionTransition;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
@ -42,9 +54,9 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -1042,10 +1054,10 @@ public class TestMasterFailover {
|
||||||
* @param cluster
|
* @param cluster
|
||||||
* @return the new active master
|
* @return the new active master
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
* @throws MasterNotRunningException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster)
|
private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster)
|
||||||
throws InterruptedException, MasterNotRunningException {
|
throws InterruptedException, IOException {
|
||||||
int activeIndex = getActiveMasterIndex(cluster);
|
int activeIndex = getActiveMasterIndex(cluster);
|
||||||
HMaster active = cluster.getMaster();
|
HMaster active = cluster.getMaster();
|
||||||
cluster.stopMaster(activeIndex);
|
cluster.stopMaster(activeIndex);
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||||
|
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -179,7 +180,7 @@ public class TestEndToEndSplitTransaction {
|
||||||
//for daughters.
|
//for daughters.
|
||||||
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||||
|
|
||||||
Stoppable stopper = new SimpleStoppable();
|
Stoppable stopper = new StoppableImplementation();
|
||||||
RegionSplitter regionSplitter = new RegionSplitter(table);
|
RegionSplitter regionSplitter = new RegionSplitter(table);
|
||||||
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
|
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
|
||||||
|
|
||||||
|
@ -202,20 +203,6 @@ public class TestEndToEndSplitTransaction {
|
||||||
regionChecker.verify();
|
regionChecker.verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class SimpleStoppable implements Stoppable {
|
|
||||||
volatile boolean stopped = false;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop(String why) {
|
|
||||||
this.stopped = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStopped() {
|
|
||||||
return stopped;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class RegionSplitter extends Thread {
|
static class RegionSplitter extends Thread {
|
||||||
Throwable ex;
|
Throwable ex;
|
||||||
HTable table;
|
HTable table;
|
||||||
|
|
|
@ -289,7 +289,7 @@ public class LoadTestTool extends AbstractHBaseTool {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doWork() throws IOException {
|
protected int doWork() throws IOException {
|
||||||
if (cmd.hasOption(OPT_ZK_QUORUM)) {
|
if (cmd.hasOption(OPT_ZK_QUORUM)) {
|
||||||
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
|
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
|
||||||
}
|
}
|
||||||
|
@ -335,6 +335,16 @@ public class LoadTestTool extends AbstractHBaseTool {
|
||||||
if (isRead) {
|
if (isRead) {
|
||||||
readerThreads.waitForFinish();
|
readerThreads.waitForFinish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean success = true;
|
||||||
|
if (isWrite) {
|
||||||
|
success = success && writerThreads.getNumWriteFailures() == 0;
|
||||||
|
}
|
||||||
|
if (isRead) {
|
||||||
|
success = success && readerThreads.getNumReadErrors() == 0
|
||||||
|
&& readerThreads.getNumReadFailures() == 0;
|
||||||
|
}
|
||||||
|
return success ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class RestartMetaTest extends AbstractHBaseTool {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doWork() throws Exception {
|
protected int doWork() throws Exception {
|
||||||
ProcessBasedLocalHBaseCluster hbaseCluster =
|
ProcessBasedLocalHBaseCluster hbaseCluster =
|
||||||
new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers);
|
new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers);
|
||||||
hbaseCluster.startMiniDFS();
|
hbaseCluster.startMiniDFS();
|
||||||
|
@ -128,6 +128,7 @@ public class RestartMetaTest extends AbstractHBaseTool {
|
||||||
+ Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY)
|
+ Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY)
|
||||||
.get(HConstants.SERVER_QUALIFIER)));
|
.get(HConstants.SERVER_QUALIFIER)));
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A base implementation for a Stoppable service
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class StoppableImplementation implements Stoppable {
|
||||||
|
volatile boolean stopped = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
this.stopped = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return stopped;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue