HBASE-14625 Chaos Monkey should shut down faster

This commit is contained in:
Elliott Clark 2015-10-15 22:29:17 -04:00
parent f4a4c3a489
commit 16af8df41f
21 changed files with 146 additions and 13 deletions

View File

@ -210,6 +210,11 @@ public class Action {
+ " servers to " + toServers.size() + " different servers"); + " servers to " + toServers.size() + " different servers");
Admin admin = this.context.getHBaseIntegrationTestingUtility().getHBaseAdmin(); Admin admin = this.context.getHBaseIntegrationTestingUtility().getHBaseAdmin();
for (byte[] victimRegion : victimRegions) { for (byte[] victimRegion : victimRegions) {
// Don't keep moving regions if we're
// trying to stop the monkey.
if (context.isStopping()) {
break;
}
int targetIx = RandomUtils.nextInt(toServers.size()); int targetIx = RandomUtils.nextInt(toServers.size());
admin.move(victimRegion, Bytes.toBytes(toServers.get(targetIx).getServerName())); admin.move(victimRegion, Bytes.toBytes(toServers.get(targetIx).getServerName()));
} }
@ -249,5 +254,9 @@ public class Action {
public HBaseCluster getHBaseCluster() { public HBaseCluster getHBaseCluster() {
return util.getHBaseClusterInterface(); return util.getHBaseClusterInterface();
} }
public boolean isStopping() {
return false;
}
} }
} }

View File

@ -53,6 +53,11 @@ public class AddColumnAction extends Action {
columnDescriptor = new HColumnDescriptor(RandomStringUtils.randomAlphabetic(5)); columnDescriptor = new HColumnDescriptor(RandomStringUtils.randomAlphabetic(5));
} }
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
LOG.debug("Performing action: Adding " + columnDescriptor + " to " + tableName); LOG.debug("Performing action: Adding " + columnDescriptor + " to " + tableName);
tableDescriptor.addFamily(columnDescriptor); tableDescriptor.addFamily(columnDescriptor);

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.chaos.actions; package org.apache.hadoop.hbase.chaos.actions;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
@ -41,29 +43,39 @@ public class BatchRestartRsAction extends RestartActionBaseAction {
List<ServerName> selectedServers = PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), List<ServerName> selectedServers = PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(),
ratio); ratio);
for (ServerName server : selectedServers) { Set<ServerName> killedServers = new HashSet<ServerName>();
LOG.info("Killing region server:" + server);
cluster.killRegionServer(server);
}
for (ServerName server : selectedServers) { for (ServerName server : selectedServers) {
// Don't keep killing servers if we're
// trying to stop the monkey.
if (context.isStopping()) {
break;
}
LOG.info("Killing region server:" + server);
cluster.killRegionServer(server);
killedServers.add(server);
}
for (ServerName server : killedServers) {
cluster.waitForRegionServerToStop(server, PolicyBasedChaosMonkey.TIMEOUT); cluster.waitForRegionServerToStop(server, PolicyBasedChaosMonkey.TIMEOUT);
} }
LOG.info("Killed " + selectedServers.size() + " region servers. Reported num of rs:" LOG.info("Killed " + killedServers.size() + " region servers. Reported num of rs:"
+ cluster.getClusterStatus().getServersSize()); + cluster.getClusterStatus().getServersSize());
sleep(sleepTime); sleep(sleepTime);
for (ServerName server : selectedServers) { for (ServerName server : killedServers) {
LOG.info("Starting region server:" + server.getHostname()); LOG.info("Starting region server:" + server.getHostname());
cluster.startRegionServer(server.getHostname(), server.getPort()); cluster.startRegionServer(server.getHostname(), server.getPort());
} }
for (ServerName server : selectedServers) { for (ServerName server : killedServers) {
cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), PolicyBasedChaosMonkey.TIMEOUT); cluster.waitForRegionServerToStart(server.getHostname(),
server.getPort(),
PolicyBasedChaosMonkey.TIMEOUT);
} }
LOG.info("Started " + selectedServers.size() +" region servers. Reported num of rs:" LOG.info("Started " + killedServers.size() +" region servers. Reported num of rs:"
+ cluster.getClusterStatus().getServersSize()); + cluster.getClusterStatus().getServersSize());
} }
} }

View File

@ -73,6 +73,10 @@ public class ChangeBloomFilterAction extends Action {
+ descriptor.getNameAsString() + " of table " + tableName); + descriptor.getNameAsString() + " of table " + tableName);
} }
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
admin.modifyTable(tableName, tableDescriptor); admin.modifyTable(tableName, tableDescriptor);
} }
} }

View File

@ -74,6 +74,11 @@ public class ChangeCompressionAction extends Action {
} }
} }
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
admin.modifyTable(tableName, tableDescriptor); admin.modifyTable(tableName, tableDescriptor);
} }
} }

View File

@ -66,6 +66,10 @@ public class ChangeEncodingAction extends Action {
+ " to: " + descriptor.getDataBlockEncoding()); + " to: " + descriptor.getDataBlockEncoding());
} }
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
admin.modifyTable(tableName, tableDescriptor); admin.modifyTable(tableName, tableDescriptor);
} }
} }

View File

@ -61,6 +61,10 @@ public class ChangeVersionsAction extends Action {
for(HColumnDescriptor descriptor:columnDescriptors) { for(HColumnDescriptor descriptor:columnDescriptors) {
descriptor.setVersions(versions, versions); descriptor.setVersions(versions, versions);
} }
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
LOG.debug("Performing action: Changing versions on " + tableName.getNameAsString()); LOG.debug("Performing action: Changing versions on " + tableName.getNameAsString());
admin.modifyTable(tableName, tableDescriptor); admin.modifyTable(tableName, tableDescriptor);
} }

View File

@ -68,6 +68,11 @@ public class DecreaseMaxHFileSizeAction extends Action {
// Change the table descriptor. // Change the table descriptor.
htd.setMaxFileSize(newValue); htd.setMaxFileSize(newValue);
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
// modify the table. // modify the table.
admin.modifyTable(tableName, htd); admin.modifyTable(tableName, htd);

View File

@ -43,6 +43,11 @@ public class FlushTableAction extends Action {
HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility(); HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility();
Admin admin = util.getHBaseAdmin(); Admin admin = util.getHBaseAdmin();
// Don't try the flush if we're stopping
if (context.isStopping()) {
return;
}
LOG.info("Performing action: Flush table " + tableName); LOG.info("Performing action: Flush table " + tableName);
try { try {
admin.flush(tableName); admin.flush(tableName);

View File

@ -24,6 +24,10 @@ package org.apache.hadoop.hbase.chaos.actions;
public class ForceBalancerAction extends Action { public class ForceBalancerAction extends Action {
@Override @Override
public void perform() throws Exception { public void perform() throws Exception {
// Don't try the flush if we're stopping
if (context.isStopping()) {
return;
}
LOG.info("Balancing regions"); LOG.info("Balancing regions");
forceBalancer(); forceBalancer();
} }

View File

@ -58,6 +58,12 @@ public class MergeRandomAdjacentRegionsOfTableAction extends Action {
HRegionInfo a = regions.get(i++); HRegionInfo a = regions.get(i++);
HRegionInfo b = regions.get(i); HRegionInfo b = regions.get(i);
LOG.debug("Merging " + a.getRegionNameAsString() + " and " + b.getRegionNameAsString()); LOG.debug("Merging " + a.getRegionNameAsString() + " and " + b.getRegionNameAsString());
// Don't try the merge if we're stopping
if (context.isStopping()) {
return;
}
try { try {
admin.mergeRegions(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false); admin.mergeRegions(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false);
} catch (Exception ex) { } catch (Exception ex) {

View File

@ -69,6 +69,12 @@ public class MoveRegionsOfTableAction extends Action {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for (HRegionInfo regionInfo:regions) { for (HRegionInfo regionInfo:regions) {
// Don't try the move if we're stopping
if (context.isStopping()) {
return;
}
try { try {
String destServerName = String destServerName =
servers[RandomUtils.nextInt(servers.length)].getServerName(); servers[RandomUtils.nextInt(servers.length)].getServerName();

View File

@ -68,6 +68,10 @@ public class RemoveColumnAction extends Action {
+ tableName.getNameAsString()); + tableName.getNameAsString());
tableDescriptor.removeFamily(colDescName); tableDescriptor.removeFamily(colDescName);
// Don't try the modify if we're stopping
if (context.isStopping()) {
return;
}
admin.modifyTable(tableName, tableDescriptor); admin.modifyTable(tableName, tableDescriptor);
} }
} }

View File

@ -40,6 +40,11 @@ public class RestartActionBaseAction extends Action {
void restartMaster(ServerName server, long sleepTime) throws IOException { void restartMaster(ServerName server, long sleepTime) throws IOException {
sleepTime = Math.max(sleepTime, 1000); sleepTime = Math.max(sleepTime, 1000);
// Don't try the kill if we're stopping
if (context.isStopping()) {
return;
}
killMaster(server); killMaster(server);
sleep(sleepTime); sleep(sleepTime);
startMaster(server); startMaster(server);
@ -47,6 +52,10 @@ public class RestartActionBaseAction extends Action {
void restartRs(ServerName server, long sleepTime) throws IOException { void restartRs(ServerName server, long sleepTime) throws IOException {
sleepTime = Math.max(sleepTime, 1000); sleepTime = Math.max(sleepTime, 1000);
// Don't try the kill if we're stopping
if (context.isStopping()) {
return;
}
killRs(server); killRs(server);
sleep(sleepTime); sleep(sleepTime);
startRs(server); startRs(server);
@ -54,6 +63,10 @@ public class RestartActionBaseAction extends Action {
void restartZKNode(ServerName server, long sleepTime) throws IOException { void restartZKNode(ServerName server, long sleepTime) throws IOException {
sleepTime = Math.max(sleepTime, 1000); sleepTime = Math.max(sleepTime, 1000);
// Don't try the kill if we're stopping
if (context.isStopping()) {
return;
}
killZKNode(server); killZKNode(server);
sleep(sleepTime); sleep(sleepTime);
startZKNode(server); startZKNode(server);
@ -61,6 +74,10 @@ public class RestartActionBaseAction extends Action {
void restartDataNode(ServerName server, long sleepTime) throws IOException { void restartDataNode(ServerName server, long sleepTime) throws IOException {
sleepTime = Math.max(sleepTime, 1000); sleepTime = Math.max(sleepTime, 1000);
// Don't try the kill if we're stopping
if (context.isStopping()) {
return;
}
killDataNode(server); killDataNode(server);
sleep(sleepTime); sleep(sleepTime);
startDataNode(server); startDataNode(server);

View File

@ -63,7 +63,7 @@ public class RollingBatchRestartRsAction extends BatchRestartRsAction {
Queue<ServerName> deadServers = new LinkedList<ServerName>(); Queue<ServerName> deadServers = new LinkedList<ServerName>();
// loop while there are servers to be killed or dead servers to be restarted // loop while there are servers to be killed or dead servers to be restarted
while (!serversToBeKilled.isEmpty() || !deadServers.isEmpty()) { while ((!serversToBeKilled.isEmpty() || !deadServers.isEmpty()) && !context.isStopping()) {
KillOrStart action = KillOrStart.KILL; KillOrStart action = KillOrStart.KILL;
if (serversToBeKilled.isEmpty()) { // no more servers to kill if (serversToBeKilled.isEmpty()) { // no more servers to kill

View File

@ -44,6 +44,11 @@ public class SnapshotTableAction extends Action {
String snapshotName = tableName + "-it-" + System.currentTimeMillis(); String snapshotName = tableName + "-it-" + System.currentTimeMillis();
Admin admin = util.getHBaseAdmin(); Admin admin = util.getHBaseAdmin();
// Don't try the snapshot if we're stopping
if (context.isStopping()) {
return;
}
LOG.info("Performing action: Snapshot table " + tableName); LOG.info("Performing action: Snapshot table " + tableName);
admin.snapshot(snapshotName, tableName); admin.snapshot(snapshotName, tableName);
if (sleepTime > 0) { if (sleepTime > 0) {

View File

@ -34,7 +34,10 @@ public class SplitAllRegionOfTableAction extends Action {
public void perform() throws Exception { public void perform() throws Exception {
HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility(); HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility();
Admin admin = util.getHBaseAdmin(); Admin admin = util.getHBaseAdmin();
// Don't try the split if we're stopping
if (context.isStopping()) {
return;
}
LOG.info("Performing action: Split all regions of " + tableName); LOG.info("Performing action: Split all regions of " + tableName);
admin.split(tableName); admin.split(tableName);
} }

View File

@ -53,6 +53,10 @@ public class SplitRandomRegionOfTableAction extends Action {
LOG.info("Table " + tableName + " doesn't have regions to split"); LOG.info("Table " + tableName + " doesn't have regions to split");
return; return;
} }
// Don't try the split if we're stopping
if (context.isStopping()) {
return;
}
HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem( HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem(
regions.toArray(new HRegionInfo[regions.size()])); regions.toArray(new HRegionInfo[regions.size()]));

View File

@ -42,6 +42,11 @@ public class TruncateTableAction extends Action {
HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility(); HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility();
Admin admin = util.getHBaseAdmin(); Admin admin = util.getHBaseAdmin();
// Don't try the truncate if we're stopping
if (context.isStopping()) {
return;
}
boolean preserveSplits = random.nextBoolean(); boolean preserveSplits = random.nextBoolean();
LOG.info("Performing action: Truncate table " + tableName.getNameAsString() + LOG.info("Performing action: Truncate table " + tableName.getNameAsString() +
"preserve splits " + preserveSplits); "preserve splits " + preserveSplits);

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.chaos.actions; package org.apache.hadoop.hbase.chaos.actions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.lang.math.RandomUtils;
import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus;
@ -51,6 +53,8 @@ public class UnbalanceKillAndRebalanceAction extends Action {
public void perform() throws Exception { public void perform() throws Exception {
ClusterStatus status = this.cluster.getClusterStatus(); ClusterStatus status = this.cluster.getClusterStatus();
List<ServerName> victimServers = new LinkedList<ServerName>(status.getServers()); List<ServerName> victimServers = new LinkedList<ServerName>(status.getServers());
Set<ServerName> killedServers = new HashSet<ServerName>();
int liveCount = (int)Math.ceil(FRC_SERVERS_THAT_HOARD_AND_LIVE * victimServers.size()); int liveCount = (int)Math.ceil(FRC_SERVERS_THAT_HOARD_AND_LIVE * victimServers.size());
int deadCount = (int)Math.ceil(FRC_SERVERS_THAT_HOARD_AND_DIE * victimServers.size()); int deadCount = (int)Math.ceil(FRC_SERVERS_THAT_HOARD_AND_DIE * victimServers.size());
Assert.assertTrue((liveCount + deadCount) < victimServers.size()); Assert.assertTrue((liveCount + deadCount) < victimServers.size());
@ -62,13 +66,20 @@ public class UnbalanceKillAndRebalanceAction extends Action {
unbalanceRegions(status, victimServers, targetServers, HOARD_FRC_OF_REGIONS); unbalanceRegions(status, victimServers, targetServers, HOARD_FRC_OF_REGIONS);
Thread.sleep(waitForUnbalanceMilliSec); Thread.sleep(waitForUnbalanceMilliSec);
for (int i = 0; i < liveCount; ++i) { for (int i = 0; i < liveCount; ++i) {
// Don't keep killing servers if we're
// trying to stop the monkey.
if (context.isStopping()) {
break;
}
killRs(targetServers.get(i)); killRs(targetServers.get(i));
killedServers.add(targetServers.get(i));
} }
Thread.sleep(waitForKillsMilliSec); Thread.sleep(waitForKillsMilliSec);
forceBalancer(); forceBalancer();
Thread.sleep(waitAfterBalanceMilliSec); Thread.sleep(waitAfterBalanceMilliSec);
for (int i = 0; i < liveCount; ++i) { for (ServerName server:killedServers) {
startRs(targetServers.get(i)); startRs(server);
} }
} }
} }

View File

@ -35,14 +35,29 @@ public abstract class Policy extends StoppableImplementation implements Runnable
public void init(PolicyContext context) throws Exception { public void init(PolicyContext context) throws Exception {
this.context = context; this.context = context;
// Used to wire up stopping.
context.setPolicy(this);
} }
/** /**
* A context for a Policy * A context for a Policy
*/ */
public static class PolicyContext extends Action.ActionContext { public static class PolicyContext extends Action.ActionContext {
Policy policy = null;
public PolicyContext(IntegrationTestingUtility util) { public PolicyContext(IntegrationTestingUtility util) {
super(util); super(util);
} }
@Override
public boolean isStopping() {
return policy.isStopped();
}
public void setPolicy(Policy policy) {
this.policy = policy;
}
} }
} }