HBASE-7200 hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1414559 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-11-28 07:15:13 +00:00
parent 54a761ed2d
commit 4d4129231b
5 changed files with 417 additions and 129 deletions

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
/**
* A base class for tests that do something with the cluster while running
* {@link LoadTestTool} to write and verify some data.
*/
public abstract class IngestIntegrationTestBase {
private static String tableName = null;
/** A soft limit on how long we should run */
private static final String RUN_TIME_KEY = "hbase.%s.runtime";
protected static final Log LOG = LogFactory.getLog(IngestIntegrationTestBase.class);
protected IntegrationTestingUtility util;
protected HBaseCluster cluster;
private LoadTestTool loadTool;
protected void setUp(int numSlavesBase) throws Exception {
tableName = this.getClass().getSimpleName();
util = new IntegrationTestingUtility();
LOG.info("Initializing cluster with " + numSlavesBase + " servers");
util.initializeCluster(numSlavesBase);
LOG.info("Done initializing cluster");
cluster = util.getHBaseClusterInterface();
deleteTableIfNecessary();
loadTool = new LoadTestTool();
loadTool.setConf(util.getConfiguration());
// Initialize load test tool before we start breaking things;
// LoadTestTool init, even when it is a no-op, is very fragile.
int ret = loadTool.run(new String[] { "-tn", tableName, "-init_only" });
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
}
protected void tearDown() throws Exception {
LOG.info("Restoring the cluster");
util.restoreCluster();
LOG.info("Done restoring the cluster");
}
private void deleteTableIfNecessary() throws IOException {
if (util.getHBaseAdmin().tableExists(tableName)) {
util.deleteTable(Bytes.toBytes(tableName));
}
}
protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter,
int colsPerKey, int recordSize, int writeThreads) throws Exception {
LOG.info("Running ingest");
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
long start = System.currentTimeMillis();
String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
long startKey = 0;
long numKeys = getNumKeys(keysPerServerPerIter);
while (System.currentTimeMillis() - start < 0.9 * runtime) {
LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
((runtime - (System.currentTimeMillis() - start))/60000) + " min");
int ret = loadTool.run(new String[] {
"-tn", tableName,
"-write", String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads),
"-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys),
"-skip_init"
});
if (0 != ret) {
String errorMsg = "Load failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
ret = loadTool.run(new String[] {
"-tn", tableName,
"-read", "100:20",
"-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys),
"-skip_init"
});
if (0 != ret) {
String errorMsg = "Verification failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
startKey += numKeys;
}
}
/** Estimates a data size based on the cluster size */
private long getNumKeys(int keysPerServer)
throws IOException {
int numRegionServers = cluster.getClusterStatus().getServersSize();
return keysPerServer * numRegionServers;
}
}

View File

@ -22,11 +22,8 @@ import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChaosMonkey;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -39,95 +36,33 @@ import org.junit.experimental.categories.Category;
* configuration parameter.
*/
@Category(IntegrationTests.class)
public class IntegrationTestDataIngestWithChaosMonkey {
public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationTestBase {
private static final String TABLE_NAME = "TestDataIngestWithChaosMonkey";
private static final int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
/** A soft limit on how long we should run */
private static final String RUN_TIME_KEY = "hbase.IntegrationTestDataIngestWithChaosMonkey.runtime";
//run for 5 min by default
// run for 5 min by default
private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000;
private static final Log LOG = LogFactory.getLog(IntegrationTestDataIngestWithChaosMonkey.class);
private IntegrationTestingUtility util;
private HBaseCluster cluster;
private ChaosMonkey monkey;
@Before
public void setUp() throws Exception {
util = new IntegrationTestingUtility();
util.initializeCluster(NUM_SLAVES_BASE);
cluster = util.getHBaseClusterInterface();
deleteTableIfNecessary();
super.setUp(NUM_SLAVES_BASE);
monkey = new ChaosMonkey(util, ChaosMonkey.EVERY_MINUTE_RANDOM_ACTION_POLICY);
monkey.start();
}
@After
public void tearDown() throws Exception {
monkey.stop("test has finished, that's why");
monkey.waitForStop();
util.restoreCluster();
}
private void deleteTableIfNecessary() throws IOException {
if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
util.deleteTable(Bytes.toBytes(TABLE_NAME));
if (monkey != null) {
monkey.stop("test has finished, that's why");
monkey.waitForStop();
}
super.tearDown();
}
@Test
public void testDataIngest() throws Exception {
LOG.info("Running testDataIngest");
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
LoadTestTool loadTool = new LoadTestTool();
loadTool.setConf(util.getConfiguration());
long start = System.currentTimeMillis();
long runtime = util.getConfiguration().getLong(RUN_TIME_KEY, DEFAULT_RUN_TIME);
long startKey = 0;
long numKeys = estimateDataSize();
while (System.currentTimeMillis() - start < 0.9 * runtime) {
LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
((runtime - (System.currentTimeMillis() - start))/60000) + " min");
int ret = loadTool.run(new String[] {
"-tn", TABLE_NAME,
"-write", "10:100:20",
"-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys)
});
//assert that load was successful
Assert.assertEquals(0, ret);
ret = loadTool.run(new String[] {
"-tn", TABLE_NAME,
"-read", "100:20",
"-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys)
});
//assert that verify was successful
Assert.assertEquals(0, ret);
startKey += numKeys;
}
}
/** Estimates a data size based on the cluster size */
protected long estimateDataSize() throws IOException {
//base is a 4 slave node cluster.
ClusterStatus status = cluster.getClusterStatus();
int numRegionServers = status.getServersSize();
int multiplication = Math.max(1, numRegionServers / NUM_SLAVES_BASE);
return 10000 * multiplication;
runIngestTest(DEFAULT_RUN_TIME, 2500, 10, 100, 20);
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChaosMonkey;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ChaosMonkey.Action;
import org.apache.hadoop.hbase.util.ChaosMonkey.RestartActiveMaster;
import org.apache.hadoop.hbase.util.ChaosMonkey.RestartRandomRs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* A system test which does large data ingestion and verify using {@link LoadTestTool},
* while killing the region servers and the master(s) randomly. You can configure how long
* should the load test run by using "hbase.IntegrationTestRebalanceAndKillServers s.runtime"
* configuration parameter.
*/
@Category(IntegrationTests.class)
public class IntegrationTestRebalanceAndKillServers extends IngestIntegrationTestBase {
private static final int NUM_SLAVES_BASE = 4; // number of slaves for the smallest cluster
private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000; // run for 5 min by default
private static final long KILL_SERVICE_EVERY_MS = 45 * 1000;
private static final int SERVER_PER_MASTER_KILL = 3;
private static final long KILL_SERVER_FOR_MS = 5 * 1000;
private static final long KILL_MASTER_FOR_MS = 100;
private static final long UNBALANCE_REGIONS_EVERY_MS = 30 * 1000;
/** @see ChaosMonkey.UnbalanceRegionsAction#UnbalanceRegionsAction(double, double) */
private static final double UNBALANCE_TO_FRC_OF_SERVERS = 0.5;
/** @see ChaosMonkey.UnbalanceRegionsAction#UnbalanceRegionsAction(double, double) */
private static final double UNBALANCE_FRC_OF_REGIONS = 0.5;
private static final long BALANCE_REGIONS_EVERY_MS = 10 * 1000;
private ChaosMonkey monkey;
@Before
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
super.setUp(NUM_SLAVES_BASE);
ChaosMonkey.Policy killPolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
KILL_SERVICE_EVERY_MS,
new Pair<Action,Integer>(new ChaosMonkey.RestartActiveMaster(KILL_MASTER_FOR_MS), 1),
new Pair<Action,Integer>(new ChaosMonkey.RestartRandomRs(KILL_SERVER_FOR_MS), SERVER_PER_MASTER_KILL));
ChaosMonkey.Policy unbalancePolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
UNBALANCE_REGIONS_EVERY_MS,
new ChaosMonkey.UnbalanceRegionsAction(UNBALANCE_FRC_OF_REGIONS, UNBALANCE_TO_FRC_OF_SERVERS));
ChaosMonkey.Policy balancePolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
BALANCE_REGIONS_EVERY_MS, new ChaosMonkey.ForceBalancerAction());
monkey = new ChaosMonkey(util, killPolicy, unbalancePolicy, balancePolicy);
monkey.start();
}
@After
public void tearDown() throws Exception {
if (monkey != null) {
monkey.stop("tearDown");
monkey.waitForStop();
}
super.tearDown();
}
@Test
public void testDataIngest() throws Exception {
runIngestTest(DEFAULT_RUN_TIME, 2500, 10, 100, 20);
}
}

View File

@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
@ -34,15 +37,19 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseCluster;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.IntegrationTestDataIngestWithChaosMonkey;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ServiceException;
/**
* A utility to injects faults in a running cluster.
@ -86,6 +93,16 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
setPoliciesByName(policies);
}
/**
* Construct a new ChaosMonkey
* @param util the HBaseIntegrationTestingUtility already configured
* @param policies custom policies to use
*/
public ChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
this.util = util;
this.policies = policies;
}
private void setPoliciesByName(String... policies) {
this.policies = new Policy[policies.length];
for (int i=0; i < policies.length; i++) {
@ -115,16 +132,16 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
/**
* A (possibly mischievous) action that the ChaosMonkey can perform.
*/
private static class Action {
long sleepTime; //how long should we sleep
ActionContext context;
HBaseCluster cluster;
ClusterStatus initialStatus;
ServerName[] initialServers;
public static class Action {
// TODO: interesting question - should actions be implemented inside
// ChaosMonkey, or outside? If they are inside (initial), the class becomes
// huge and all-encompassing; if they are outside ChaosMonkey becomes just
// a random task scheduler. For now, keep inside.
public Action(long sleepTime) {
this.sleepTime = sleepTime;
}
protected ActionContext context;
protected HBaseCluster cluster;
protected ClusterStatus initialStatus;
protected ServerName[] initialServers;
void init(ActionContext context) throws Exception {
this.context = context;
@ -136,33 +153,28 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
void perform() throws Exception { };
// TODO: perhaps these methods should be elsewhere?
/** Returns current region servers */
ServerName[] getCurrentServers() throws IOException {
protected ServerName[] getCurrentServers() throws IOException {
Collection<ServerName> regionServers = cluster.getClusterStatus().getServers();
return regionServers.toArray(new ServerName[regionServers.size()]);
}
void killMaster(ServerName server) throws IOException {
protected void killMaster(ServerName server) throws IOException {
LOG.info("Killing master:" + server);
cluster.killMaster(server);
cluster.waitForMasterToStop(server, TIMEOUT);
LOG.info("Killed master server:" + server);
}
void startMaster(ServerName server) throws IOException {
protected void startMaster(ServerName server) throws IOException {
LOG.info("Starting master:" + server.getHostname());
cluster.startMaster(server.getHostname());
cluster.waitForActiveAndReadyMaster(TIMEOUT);
LOG.info("Started master: " + server);
}
void restartMaster(ServerName server, long sleepTime) throws IOException {
killMaster(server);
sleep(sleepTime);
startMaster(server);
}
void killRs(ServerName server) throws IOException {
protected void killRs(ServerName server) throws IOException {
LOG.info("Killing region server:" + server);
cluster.killRegionServer(server);
cluster.waitForRegionServerToStop(server, TIMEOUT);
@ -170,19 +182,33 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
+ cluster.getClusterStatus().getServersSize());
}
void startRs(ServerName server) throws IOException {
protected void startRs(ServerName server) throws IOException {
LOG.info("Starting region server:" + server.getHostname());
cluster.startRegionServer(server.getHostname());
cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
LOG.info("Started region server:" + server + ". Reported num of rs:"
+ cluster.getClusterStatus().getServersSize());
}
}
private static class RestartActionBase extends Action {
long sleepTime; // how long should we sleep
public RestartActionBase(long sleepTime) {
this.sleepTime = sleepTime;
}
void sleep(long sleepTime) {
LOG.info("Sleeping for:" + sleepTime);
Threads.sleep(sleepTime);
}
void restartMaster(ServerName server, long sleepTime) throws IOException {
killMaster(server);
sleep(sleepTime);
startMaster(server);
}
void restartRs(ServerName server, long sleepTime) throws IOException {
killRs(server);
sleep(sleepTime);
@ -190,7 +216,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
}
}
private static class RestartActiveMaster extends Action {
public static class RestartActiveMaster extends RestartActionBase {
public RestartActiveMaster(long sleepTime) {
super(sleepTime);
}
@ -203,16 +229,11 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
}
}
private static class RestartRandomRs extends Action {
public static class RestartRandomRs extends RestartActionBase {
public RestartRandomRs(long sleepTime) {
super(sleepTime);
}
@Override
void init(ActionContext context) throws Exception {
super.init(context);
}
@Override
void perform() throws Exception {
LOG.info("Performing action: Restart random region server");
@ -222,7 +243,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
}
}
private static class RestartRsHoldingMeta extends RestartRandomRs {
public static class RestartRsHoldingMeta extends RestartRandomRs {
public RestartRsHoldingMeta(long sleepTime) {
super(sleepTime);
}
@ -238,7 +259,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
}
}
private static class RestartRsHoldingRoot extends RestartRandomRs {
public static class RestartRsHoldingRoot extends RestartRandomRs {
public RestartRsHoldingRoot(long sleepTime) {
super(sleepTime);
}
@ -257,7 +278,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
/**
* Restarts a ratio of the running regionservers at the same time
*/
private static class BatchRestartRs extends Action {
public static class BatchRestartRs extends RestartActionBase {
float ratio; //ratio of regionservers to restart
public BatchRestartRs(long sleepTime, float ratio) {
@ -265,11 +286,6 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
this.ratio = ratio;
}
@Override
void init(ActionContext context) throws Exception {
super.init(context);
}
@Override
void perform() throws Exception {
LOG.info(String.format("Performing action: Batch restarting %d%% of region servers",
@ -307,7 +323,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
* Restarts a ratio of the regionservers in a rolling fashion. At each step, either kills a
* server, or starts one, sleeping randomly (0-sleepTime) in between steps.
*/
private static class RollingBatchRestartRs extends BatchRestartRs {
public static class RollingBatchRestartRs extends BatchRestartRs {
public RollingBatchRestartRs(long sleepTime, float ratio) {
super(sleepTime, ratio);
}
@ -346,6 +362,71 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
}
}
public static class UnbalanceRegionsAction extends Action {
private double fractionOfRegions;
private double fractionOfServers;
private Random random = new Random();
/**
* Unbalances the regions on the cluster by choosing "target" servers, and moving
* some regions from each of the non-target servers to random target servers.
* @param fractionOfRegions Fraction of regions to move from each server.
* @param fractionOfServers Fraction of servers to be chosen as targets.
*/
public UnbalanceRegionsAction(double fractionOfRegions, double fractionOfServers) {
this.fractionOfRegions = fractionOfRegions;
this.fractionOfServers = fractionOfServers;
}
@Override
void perform() throws Exception {
LOG.info("Unbalancing regions");
ClusterStatus status = this.cluster.getClusterStatus();
List<ServerName> victimServers = new LinkedList<ServerName>(status.getServers());
int targetServerCount = (int)Math.ceil(fractionOfServers * victimServers.size());
List<byte[]> targetServers = new ArrayList<byte[]>(targetServerCount);
for (int i = 0; i < targetServerCount; ++i) {
int victimIx = random.nextInt(victimServers.size());
String serverName = victimServers.remove(victimIx).getServerName();
targetServers.add(Bytes.toBytes(serverName));
}
List<byte[]> victimRegions = new LinkedList<byte[]>();
for (ServerName server : victimServers) {
ServerLoad serverLoad = status.getLoad(server);
// Ugh.
List<byte[]> regions = new LinkedList<byte[]>(serverLoad.getRegionsLoad().keySet());
int victimRegionCount = (int)Math.ceil(fractionOfRegions * regions.size());
LOG.debug("Removing " + victimRegionCount + " regions from " + server.getServerName());
for (int i = 0; i < victimRegionCount; ++i) {
int victimIx = random.nextInt(regions.size());
String regionId = HRegionInfo.encodeRegionName(regions.remove(victimIx));
victimRegions.add(Bytes.toBytes(regionId));
}
}
LOG.info("Moving " + victimRegions.size() + " regions from " + victimServers.size()
+ " servers to " + targetServers.size() + " different servers");
HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
for (byte[] victimRegion : victimRegions) {
int targetIx = random.nextInt(targetServers.size());
admin.move(victimRegion, targetServers.get(targetIx));
}
}
}
public static class ForceBalancerAction extends Action {
@Override
void perform() throws Exception {
LOG.info("Balancing regions");
HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
boolean result = admin.balancer();
if (!result) {
LOG.error("Balancer didn't succeed");
}
}
}
/**
* A context for a Policy
*/
@ -358,7 +439,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
/**
* A policy to introduce chaos to the cluster
*/
private static abstract class Policy extends StoppableImplementation implements Runnable {
public static abstract class Policy extends StoppableImplementation implements Runnable {
PolicyContext context;
public void init(PolicyContext context) throws Exception {
this.context = context;
@ -369,19 +450,32 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
* A policy, which picks a random action according to the given weights,
* and performs it every configurable period.
*/
private static class PeriodicRandomActionPolicy extends Policy {
private long period;
public static class PeriodicRandomActionPolicy extends Policy {
private long periodMs;
private List<Pair<Action, Integer>> actions;
PeriodicRandomActionPolicy(long period, List<Pair<Action, Integer>> actions) {
this.period = period;
public PeriodicRandomActionPolicy(long periodMs, List<Pair<Action, Integer>> actions) {
this.periodMs = periodMs;
this.actions = actions;
}
public PeriodicRandomActionPolicy(long periodMs, Pair<Action, Integer>... actions) {
// We don't expect it to be modified.
this(periodMs, Arrays.asList(actions));
}
public PeriodicRandomActionPolicy(long periodMs, Action... actions) {
this.periodMs = periodMs;
this.actions = new ArrayList<Pair<Action, Integer>>(actions.length);
for (Action action : actions) {
this.actions.add(new Pair<Action, Integer>(action, 1));
}
}
@Override
public void run() {
//add some jitter
int jitter = new Random().nextInt((int)period);
int jitter = new Random().nextInt((int)periodMs);
LOG.info("Sleeping for " + jitter + " to add jitter");
Threads.sleep(jitter);
@ -396,7 +490,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
+ StringUtils.stringifyException(ex));
}
long sleepTime = period - (System.currentTimeMillis() - start);
long sleepTime = periodMs - (System.currentTimeMillis() - start);
if (sleepTime > 0) {
LOG.info("Sleeping for:" + sleepTime);
Threads.sleep(sleepTime);
@ -407,7 +501,7 @@ public class ChaosMonkey extends AbstractHBaseTool implements Stoppable {
@Override
public void init(PolicyContext context) throws Exception {
super.init(context);
LOG.info("Using ChaosMonkey Policy: " + this.getClass() + ", period:" + period);
LOG.info("Using ChaosMonkey Policy: " + this.getClass() + ", period:" + periodMs);
for (Pair<Action, Integer> action : actions) {
action.getFirst().init(this.context);
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@ -95,6 +96,8 @@ public class LoadTestTool extends AbstractHBaseTool {
private static final String OPT_START_KEY = "start_key";
private static final String OPT_TABLE_NAME = "tn";
private static final String OPT_ZK_QUORUM = "zk";
private static final String OPT_SKIP_INIT = "skip_init";
private static final String OPT_INIT_ONLY = "init_only";
private static final long DEFAULT_START_KEY = 0;
@ -126,6 +129,11 @@ public class LoadTestTool extends AbstractHBaseTool {
private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
private int verifyPercent;
// TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
// console tool itself should only be used from console.
private boolean isSkipInit = false;
private boolean isInitOnly = false;
private String[] splitColonSeparated(String option,
int minNumCols, int maxNumCols) {
String optVal = cmd.getOptionValue(option);
@ -186,6 +194,7 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
addOptWithArg(OPT_READ, OPT_USAGE_READ);
addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
@ -200,10 +209,12 @@ public class LoadTestTool extends AbstractHBaseTool {
"separate puts for every column in a row");
addOptNoArg(OPT_ENCODE_IN_CACHE_ONLY, OPT_ENCODE_IN_CACHE_ONLY_USAGE);
addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
addOptWithArg(OPT_START_KEY, "The first key to read/write " +
"(a 0-based index). The default value is " +
DEFAULT_START_KEY + ".");
addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
+ "already exists");
}
@Override
@ -212,20 +223,35 @@ public class LoadTestTool extends AbstractHBaseTool {
tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME,
DEFAULT_TABLE_NAME));
startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
Long.MAX_VALUE - startKey);
endKey = startKey + numKeys;
isWrite = cmd.hasOption(OPT_WRITE);
isRead = cmd.hasOption(OPT_READ);
isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
if (!isWrite && !isRead) {
if (!isWrite && !isRead && !isInitOnly) {
throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
"-" + OPT_READ + " has to be specified");
}
if (isInitOnly && (isRead || isWrite)) {
throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
+ " either -" + OPT_WRITE + " or -" + OPT_READ);
}
if (!isInitOnly) {
if (!cmd.hasOption(OPT_NUM_KEYS)) {
throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
+ "read or write mode");
}
startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
Long.MAX_VALUE - startKey);
endKey = startKey + numKeys;
isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
}
encodeInCacheOnly = cmd.hasOption(OPT_ENCODE_IN_CACHE_ONLY);
parseColumnFamilyOptions(cmd);
@ -274,8 +300,6 @@ public class LoadTestTool extends AbstractHBaseTool {
System.out.println("Percent of keys to verify: " + verifyPercent);
System.out.println("Reader threads: " + numReaderThreads);
}
System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
}
private void parseColumnFamilyOptions(CommandLine cmd) {
@ -296,15 +320,27 @@ public class LoadTestTool extends AbstractHBaseTool {
StoreFile.BloomType.valueOf(bloomStr);
}
public void initTestTable() throws IOException {
HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo);
applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
}
@Override
protected int doWork() throws IOException {
if (cmd.hasOption(OPT_ZK_QUORUM)) {
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
}
HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo);
applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
if (isInitOnly) {
LOG.info("Initializing only; no reads or writes");
initTestTable();
return 0;
}
if (!isSkipInit) {
initTestTable();
}
if (isWrite) {
writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);