[jira] [HBASE-4908] HBase cluster test tool (port from 0.89-fb)

Summary:
Porting one of our HBase cluster test tools (a single-process multi-threaded
load generator and verifier) from 0.89-fb to trunk.
I cleaned up the code a bit compared to what's in 0.89-fb, and discovered that
it has some features that I have not tried yet (some kind of a kill test, and
some way to run HBase as multiple processes on one machine).
The main utility of this piece of code for us has been the HBaseClusterTest
command-line tool (called HBaseTest in 0.89-fb), which we usually invoke as a
load test in our five-node dev cluster testing, e.g.:

hbase org.apache.hadoop.hbase.util.LoadTestTool -write 50💯20 -tn loadtest4
-read 100:10 -zk <zk_quorum_node> -bloom ROWCOL -compression LZO -key_window 5
-max_read_errors 10000 -num_keys 10000000000 -start_key 0

Test Plan:
Run this on a dev cluster. Run all unit tests.

Reviewers: stack, Karthik, Kannan, nspiegelberg, JIRA

Reviewed By: nspiegelberg

CC: stack, nspiegelberg, mbautin, Karthik

Differential Revision: 549

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1211746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-12-08 02:38:27 +00:00
parent 77dae7ccb7
commit 09d0aff149
35 changed files with 2584 additions and 253 deletions

View File

@ -867,6 +867,7 @@
<commons-cli.version>1.2</commons-cli.version> <commons-cli.version>1.2</commons-cli.version>
<commons-codec.version>1.4</commons-codec.version> <commons-codec.version>1.4</commons-codec.version>
<commons-httpclient.version>3.1</commons-httpclient.version><!-- pretty outdated --> <commons-httpclient.version>3.1</commons-httpclient.version><!-- pretty outdated -->
<commons-io.version>2.1</commons-io.version>
<commons-lang.version>2.5</commons-lang.version> <commons-lang.version>2.5</commons-lang.version>
<commons-logging.version>1.1.1</commons-logging.version> <commons-logging.version>1.1.1</commons-logging.version>
<commons-math.version>2.1</commons-math.version> <commons-math.version>2.1</commons-math.version>
@ -954,6 +955,11 @@
<artifactId>commons-httpclient</artifactId> <artifactId>commons-httpclient</artifactId>
<version>${commons-httpclient.version}</version> <version>${commons-httpclient.version}</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency> <dependency>
<groupId>commons-lang</groupId> <groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId> <artifactId>commons-lang</artifactId>

View File

@ -23,9 +23,9 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
/** /**
* Class used as an empty watche for the tests * An empty ZooKeeper watcher
*/ */
public class EmptyWatcher implements Watcher{ public class EmptyWatcher implements Watcher {
public static EmptyWatcher instance = new EmptyWatcher(); public static EmptyWatcher instance = new EmptyWatcher();
private EmptyWatcher() {} private EmptyWatcher() {}

View File

@ -144,6 +144,12 @@ public final class HConstants {
/** Default limit on concurrent client-side zookeeper connections */ /** Default limit on concurrent client-side zookeeper connections */
public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30; public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30;
/** Configuration key for ZooKeeper session timeout */
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
/** Default value for ZooKeeper session timeout */
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;
/** Parameter name for port region server listens on. */ /** Parameter name for port region server listens on. */
public static final String REGIONSERVER_PORT = "hbase.regionserver.port"; public static final String REGIONSERVER_PORT = "hbase.regionserver.port";
@ -153,6 +159,10 @@ public final class HConstants {
/** default port for region server web api */ /** default port for region server web api */
public static final int DEFAULT_REGIONSERVER_INFOPORT = 60030; public static final int DEFAULT_REGIONSERVER_INFOPORT = 60030;
/** A flag that enables automatic selection of regionserver info port */
public static final String REGIONSERVER_INFO_PORT_AUTO =
"hbase.regionserver.info.port.auto";
/** Parameter name for what region server interface to use. */ /** Parameter name for what region server interface to use. */
public static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; public static final String REGION_SERVER_CLASS = "hbase.regionserver.class";
@ -204,6 +214,10 @@ public final class HConstants {
/** Used to construct the name of the compaction directory during compaction */ /** Used to construct the name of the compaction directory during compaction */
public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir"; public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir";
/** Conf key for the max file size after which we split the region */
public static final String HREGION_MAX_FILESIZE =
"hbase.hregion.max.filesize";
/** Default maximum file size */ /** Default maximum file size */
public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024; public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024;
@ -547,6 +561,12 @@ public final class HConstants {
"(" + CP_HTD_ATTR_VALUE_PARAM_KEY_PATTERN + ")=(" + "(" + CP_HTD_ATTR_VALUE_PARAM_KEY_PATTERN + ")=(" +
CP_HTD_ATTR_VALUE_PARAM_VALUE_PATTERN + "),?"); CP_HTD_ATTR_VALUE_PARAM_VALUE_PATTERN + "),?");
/** The delay when re-trying a socket operation in a loop (HBASE-4712) */
public static final int SOCKET_RETRY_WAIT_MS = 200;
/** Host name of the local machine */
public static final String LOCALHOST = "localhost";
private HConstants() { private HConstants() {
// Can't be instantiated with this ctor. // Can't be instantiated with this ctor.
} }

View File

@ -800,7 +800,7 @@ public class HConnectionManager {
try { try {
ServerName servername = ServerName servername =
this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);
LOG.debug("Lookedup root region location, connection=" + this + LOG.debug("Looked up root region location, connection=" + this +
"; serverName=" + ((servername == null)? "": servername.toString())); "; serverName=" + ((servername == null)? "": servername.toString()));
if (servername == null) return null; if (servername == null) return null;
return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,

View File

@ -83,7 +83,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
final Configuration conf = context.getConfiguration(); final Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf); final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml // These configs. are from hbase-*.xml
final long maxsize = conf.getLong("hbase.hregion.max.filesize", final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE); HConstants.DEFAULT_MAX_FILE_SIZE);
final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",
HFile.DEFAULT_BLOCKSIZE); HFile.DEFAULT_BLOCKSIZE);

View File

@ -81,7 +81,7 @@ public class CompactSplitThread implements CompactionRequestor {
// we have a complicated default. see HBASE-3877 // we have a complicated default. see HBASE-3877
long flushSize = conf.getLong("hbase.hregion.memstore.flush.size", long flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
long splitSize = conf.getLong("hbase.hregion.max.filesize", long splitSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE); HConstants.DEFAULT_MAX_FILE_SIZE);
throttleSize = Math.min(flushSize * 2, splitSize / 2); throttleSize = Math.min(flushSize * 2, splitSize / 2);
} }

View File

@ -34,7 +34,7 @@ class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy {
// By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE. // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE.
if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) { if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
maxFileSize = getConf().getLong("hbase.hregion.max.filesize", maxFileSize = getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE); HConstants.DEFAULT_MAX_FILE_SIZE);
} }
this.desiredMaxFileSize = maxFileSize; this.desiredMaxFileSize = maxFileSize;

View File

@ -1512,7 +1512,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
if (port < 0) return port; if (port < 0) return port;
String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
// check if auto port bind enabled // check if auto port bind enabled
boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto", false); boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
false);
while (true) { while (true) {
try { try {
this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf); this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);

View File

@ -26,6 +26,7 @@ import java.lang.Class;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -33,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
public class SequenceFileLogReader implements HLog.Reader { public class SequenceFileLogReader implements HLog.Reader {
@ -104,13 +106,18 @@ public class SequenceFileLogReader implements HLog.Reader {
Field fIn = FilterInputStream.class.getDeclaredField("in"); Field fIn = FilterInputStream.class.getDeclaredField("in");
fIn.setAccessible(true); fIn.setAccessible(true);
Object realIn = fIn.get(this.in); Object realIn = fIn.get(this.in);
if (realIn.getClass() == DFSInputStream.class) {
Method getFileLength = realIn.getClass(). Method getFileLength = realIn.getClass().
getMethod("getFileLength", new Class<?> []{}); getDeclaredMethod("getFileLength", new Class<?> []{});
getFileLength.setAccessible(true); getFileLength.setAccessible(true);
long realLength = ((Long)getFileLength. long realLength = ((Long)getFileLength.
invoke(realIn, new Object []{})).longValue(); invoke(realIn, new Object []{})).longValue();
assert(realLength >= this.length); assert(realLength >= this.length);
adjust = realLength - this.length; adjust = realLength - this.length;
} else {
LOG.info("Input stream class: " + realIn.getClass().getName() +
", not adjusting length");
}
} catch(Exception e) { } catch(Exception e) {
SequenceFileLogReader.LOG.warn( SequenceFileLogReader.LOG.warn(
"Error while trying to get accurate file length. " + "Error while trying to get accurate file length. " +

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Common base class used for HBase command-line tools. Simplifies workflow and
* command-line argument parsing.
*/
public abstract class AbstractHBaseTool implements Tool {
private static final int EXIT_SUCCESS = 0;
private static final int EXIT_FAILURE = 1;
private static final String HELP_OPTION = "help";
private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class);
private final Options options = new Options();
protected Configuration conf = null;
private static final Set<String> requiredOptions = new TreeSet<String>();
/**
* Override this to add command-line options using {@link #addOptWithArg}
* and similar methods.
*/
protected abstract void addOptions();
/**
* This method is called to process the options after they have been parsed.
*/
protected abstract void processOptions(CommandLine cmd);
/** The "main function" of the tool */
protected abstract void doWork() throws Exception;
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public final int run(String[] args) throws Exception {
if (conf == null) {
LOG.error("Tool configuration is not initialized");
throw new NullPointerException("conf");
}
CommandLine cmd;
try {
// parse the command line arguments
cmd = parseArgs(args);
} catch (ParseException e) {
LOG.error("Error when parsing command-line arguemnts", e);
printUsage();
return EXIT_FAILURE;
}
if (cmd.hasOption(HELP_OPTION) || !sanityCheckOptions(cmd)) {
printUsage();
return EXIT_FAILURE;
}
processOptions(cmd);
try {
doWork();
} catch (Exception e) {
LOG.error("Error running command-line tool", e);
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
private boolean sanityCheckOptions(CommandLine cmd) {
boolean success = true;
for (String reqOpt : requiredOptions) {
if (!cmd.hasOption(reqOpt)) {
LOG.error("Required option -" + reqOpt + " is missing");
success = false;
}
}
return success;
}
private CommandLine parseArgs(String[] args) throws ParseException {
options.addOption(HELP_OPTION, false, "Show usage");
addOptions();
CommandLineParser parser = new BasicParser();
return parser.parse(options, args);
}
private void printUsage() {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setWidth(80);
String usageHeader = "Options:";
String usageFooter = "";
String usageStr = "bin/hbase " + getClass().getName() + " <options>";
helpFormatter.printHelp(usageStr, usageHeader, options,
usageFooter);
}
protected void addRequiredOptWithArg(String opt, String description) {
requiredOptions.add(opt);
addOptWithArg(opt, description);
}
protected void addOptNoArg(String opt, String description) {
options.addOption(opt, false, description);
}
protected void addOptWithArg(String opt, String description) {
options.addOption(opt, true, description);
}
/**
* Parse a number and enforce a range.
*/
public static long parseLong(String s, long minValue, long maxValue) {
long l = Long.parseLong(s);
if (l < minValue || l > maxValue) {
throw new IllegalArgumentException("The value " + l
+ " is out of range [" + minValue + ", " + maxValue + "]");
}
return l;
}
public static int parseInt(String s, int minValue, int maxValue) {
return (int) parseLong(s, minValue, maxValue);
}
/** Call this from the concrete tool class's main function. */
protected void doStaticMain(String args[]) {
int ret;
try {
ret = ToolRunner.run(HBaseConfiguration.create(), this, args);
} catch (Exception ex) {
LOG.error("Error running command-line tool", ex);
ret = EXIT_FAILURE;
}
System.exit(ret);
}
}

View File

@ -33,6 +33,7 @@ import java.security.PrivilegedAction;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;

View File

@ -145,7 +145,7 @@ class HMerge {
throws IOException { throws IOException {
this.conf = conf; this.conf = conf;
this.fs = fs; this.fs = fs;
this.maxFilesize = conf.getLong("hbase.hregion.max.filesize", this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE); HConstants.DEFAULT_MAX_FILE_SIZE);
this.tabledir = new Path( this.tabledir = new Path(

View File

@ -112,4 +112,5 @@ public class Keying {
} }
return sb.toString(); return sb.toString();
} }
} }

View File

@ -79,7 +79,7 @@ import com.google.common.collect.Sets;
* <p> * <p>
* <b>Question:</b> How do I turn off automatic splitting? <br> * <b>Question:</b> How do I turn off automatic splitting? <br>
* <b>Answer:</b> Automatic splitting is determined by the configuration value * <b>Answer:</b> Automatic splitting is determined by the configuration value
* <i>"hbase.hregion.max.filesize"</i>. It is not recommended that you set this * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this
* to Long.MAX_VALUE in case you forget about manual splits. A suggested setting * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
* is 100GB, which would result in > 1hr major compactions if reached. * is 100GB, which would result in > 1hr major compactions if reached.
* <p> * <p>

View File

@ -96,7 +96,7 @@ public class ZKConfig {
int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
"localhost"); HConstants.LOCALHOST);
for (int i = 0; i < serverHosts.length; ++i) { for (int i = 0; i < serverHosts.length; ++i) {
String serverHost = serverHosts[i]; String serverHost = serverHosts[i];
String address = serverHost + ":" + peerPort + ":" + leaderPort; String address = serverHost + ":" + peerPort + ":" + leaderPort;
@ -160,7 +160,7 @@ public class ZKConfig {
// Special case for 'hbase.cluster.distributed' property being 'true' // Special case for 'hbase.cluster.distributed' property being 'true'
if (key.startsWith("server.")) { if (key.startsWith("server.")) {
if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED) if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED)
&& value.startsWith("localhost")) { && value.startsWith(HConstants.LOCALHOST)) {
String msg = "The server in zoo.cfg cannot be set to localhost " + String msg = "The server in zoo.cfg cannot be set to localhost " +
"in a fully-distributed setup because it won't be reachable. " + "in a fully-distributed setup because it won't be reachable. " +
"See \"Getting Started\" for more information."; "See \"Getting Started\" for more information.";

View File

@ -33,15 +33,18 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.EmptyWatcher;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
@ -91,7 +94,8 @@ public class ZKUtil {
if(ensemble == null) { if(ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble"); throw new IOException("Unable to determine ZooKeeper ensemble");
} }
int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000); int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
ensemble + ")"); ensemble + ")");
int retry = conf.getInt("zookeeper.recovery.retry", 3); int retry = conf.getInt("zookeeper.recovery.retry", 3);
@ -1116,4 +1120,46 @@ public class ZKUtil {
RegionTransitionData.fromBytes(data).toString() RegionTransitionData.fromBytes(data).toString()
: StringUtils.abbreviate(Bytes.toStringBinary(data), 32))))); : StringUtils.abbreviate(Bytes.toStringBinary(data), 32)))));
} }
/**
* Waits for HBase installation's base (parent) znode to become available.
* @throws IOException on ZK errors
*/
public static void waitForBaseZNode(Configuration conf) throws IOException {
LOG.info("Waiting until the base znode is available");
String parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf),
conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
HConstants.DEFAULT_ZK_SESSION_TIMEOUT), EmptyWatcher.instance);
final int maxTimeMs = 10000;
final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
KeeperException keeperEx = null;
try {
try {
for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
try {
if (zk.exists(parentZNode, false) != null) {
LOG.info("Parent znode exists: " + parentZNode);
keeperEx = null;
break;
}
} catch (KeeperException e) {
keeperEx = e;
}
Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
}
} finally {
zk.close();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (keeperEx != null) {
throw new IOException(keeperEx);
}
}
} }

View File

@ -25,7 +25,10 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -64,6 +67,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Keying;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@ -73,7 +78,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.NodeExistsException;
@ -93,6 +97,13 @@ public class HBaseTestingUtility {
private static final Log LOG = LogFactory.getLog(HBaseTestingUtility.class); private static final Log LOG = LogFactory.getLog(HBaseTestingUtility.class);
private Configuration conf; private Configuration conf;
private MiniZooKeeperCluster zkCluster = null; private MiniZooKeeperCluster zkCluster = null;
/**
* The default number of regions per regionserver when creating a pre-split
* table.
*/
private static int DEFAULT_REGIONS_PER_SERVER = 5;
/** /**
* Set if we were passed a zkCluster. If so, we won't shutdown zk as * Set if we were passed a zkCluster. If so, we won't shutdown zk as
* part of general shutdown. * part of general shutdown.
@ -1754,4 +1765,77 @@ public class HBaseTestingUtility {
return port; return port;
} }
public static void waitForHostPort(String host, int port)
throws IOException {
final int maxTimeMs = 10000;
final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
IOException savedException = null;
LOG.info("Waiting for server at " + host + ":" + port);
for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
try {
Socket sock = new Socket(InetAddress.getByName(host), port);
sock.close();
savedException = null;
LOG.info("Server at " + host + ":" + port + " is available");
break;
} catch (UnknownHostException e) {
throw new IOException("Failed to look up " + host, e);
} catch (IOException e) {
savedException = e;
}
Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
}
if (savedException != null) {
throw savedException;
}
}
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
byte[] tableName, byte[] columnFamily) throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(columnFamily));
int totalNumberOfRegions = 0;
try {
HBaseAdmin admin = new HBaseAdmin(conf);
// create a table a pre-splits regions.
// The number of splits is set as:
// region servers * regions per region server).
int numberOfServers = admin.getClusterStatus().getServers().size();
if (numberOfServers == 0) {
throw new IllegalStateException("No live regionservers");
}
totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER;
LOG.info("Number of live regionservers: " + numberOfServers + ", " +
"pre-splitting table into " + totalNumberOfRegions + " regions " +
"(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")");
byte[][] splits = new RegionSplitter.HexStringSplit().split(
totalNumberOfRegions);
admin.createTable(desc, splits);
} catch (MasterNotRunningException e) {
LOG.error("Master not running", e);
throw new IOException(e);
} catch (TableExistsException e) {
LOG.warn("Table " + Bytes.toStringBinary(tableName) +
" already exists, continuing");
}
return totalNumberOfRegions;
}
public static int getMetaRSPort(Configuration conf) throws IOException {
HTable table = new HTable(conf, HConstants.META_TABLE_NAME);
HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(""));
return hloc.getPort();
}
} }

View File

@ -1372,7 +1372,7 @@ public class TestAdmin {
private void setUpforLogRolling() { private void setUpforLogRolling() {
// Force a region split after every 768KB // Force a region split after every 768KB
TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
768L * 1024L); 768L * 1024L);
// We roll the log after every 32 writes // We roll the log after every 32 writes

View File

@ -303,7 +303,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000); TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
// This size should make it so we always split using the addContent // This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M // below. After adding all data, the first region is 1.3M
TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
1024 * 128); 1024 * 128);
TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
true); true);

View File

@ -314,7 +314,7 @@ public class TestHFileOutputFormat {
// Set down this value or we OOME in eclipse. // Set down this value or we OOME in eclipse.
conf.setInt("io.sort.mb", 20); conf.setInt("io.sort.mb", 20);
// Write a few files. // Write a few files.
conf.setLong("hbase.hregion.max.filesize", 64 * 1024); conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
Job job = new Job(conf, "testWritingPEData"); Job job = new Job(conf, "testWritingPEData");
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);

View File

@ -3278,7 +3278,7 @@ public class TestHRegion extends HBaseTestCase {
// This size should make it so we always split using the addContent // This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M // below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 1024 * 128); conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
return conf; return conf;
} }

View File

@ -55,7 +55,7 @@ public class TestRegionSplitPolicy {
@Test @Test
public void testCreateDefault() throws IOException { public void testCreateDefault() throws IOException {
conf.setLong("hbase.hregion.max.filesize", 1234L); conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1234L);
// Using a default HTD, should pick up the file size from // Using a default HTD, should pick up the file size from
// configuration. // configuration.

View File

@ -115,7 +115,7 @@ public class TestLogRolling {
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
/**** configuration for testLogRolling ****/ /**** configuration for testLogRolling ****/
// Force a region split after every 768KB // Force a region split after every 768KB
TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 768L * 1024L); TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
// We roll the log after every 32 writes // We roll the log after every 32 writes
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Random;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
/**
* A generator of random keys and values for load testing. Keys are generated
* by converting numeric indexes to strings and prefixing them with an MD5
* hash. Values are generated by selecting value size in the configured range
* and generating a pseudo-random sequence of bytes seeded by key, column
* qualifier, and value size.
* <p>
* Not thread-safe, so a separate instance is needed for every writer thread/
*/
public class LoadTestKVGenerator {
/** A random number generator for determining value size */
private Random randomForValueSize = new Random();
private final int minValueSize;
private final int maxValueSize;
public LoadTestKVGenerator(int minValueSize, int maxValueSize) {
if (minValueSize <= 0 || maxValueSize <= 0) {
throw new IllegalArgumentException("Invalid min/max value sizes: " +
minValueSize + ", " + maxValueSize);
}
this.minValueSize = minValueSize;
this.maxValueSize = maxValueSize;
}
/**
* Verifies that the given byte array is the same as what would be generated
* for the given row key and qualifier. We are assuming that the value size
* is correct, and only verify the actual bytes. However, if the min/max
* value sizes are set sufficiently high, an accidental match should be
* extremely improbable.
*/
public static boolean verify(String rowKey, String qual, byte[] value) {
byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
return Bytes.equals(expectedData, value);
}
/**
* Converts the given key to string, and prefixes it with the MD5 hash of
* the index's string representation.
*/
public static String md5PrefixedKey(long key) {
String stringKey = Long.toString(key);
String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey));
// flip the key to randomize
return md5hash + ":" + stringKey;
}
/**
* Generates a value for the given key index and column qualifier. Size is
* selected randomly in the configured range. The generated value depends
* only on the combination of the key, qualifier, and the selected value
* size. This allows to verify the actual value bytes when reading, as done
* in {@link #verify(String, String, byte[])}.
*/
public byte[] generateRandomSizeValue(long key, String qual) {
String rowKey = md5PrefixedKey(key);
int dataSize = minValueSize + randomForValueSize.nextInt(
Math.abs(maxValueSize - minValueSize));
return getValueForRowColumn(rowKey, qual, dataSize);
}
/**
* Generates random bytes of the given size for the given row and column
* qualifier. The random seed is fully determined by these parameters.
*/
private static byte[] getValueForRowColumn(String rowKey, String qual,
int dataSize) {
Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
dataSize);
byte[] randomBytes = new byte[dataSize];
seededRandom.nextBytes(randomBytes);
return randomBytes;
}
}

View File

@ -0,0 +1,305 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile;
/**
* A command-line utility that reads, writes, and verifies data. Unlike
* {@link PerformanceEvaluation}, this tool validates the data written,
* and supports simultaneously writing and reading the same set of keys.
*/
public class LoadTestTool extends AbstractHBaseTool {
private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
/** Table name for the test */
private byte[] tableName;
/** Table name to use of not overridden on the command line */
private static final String DEFAULT_TABLE_NAME = "cluster_test";
/** Column family used by the test */
static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
/** Column families used by the test */
static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
/** The number of reader/writer threads if not specified */
private static final int DEFAULT_NUM_THREADS = 20;
/** Usage string for the load option */
private static final String OPT_USAGE_LOAD =
"<avg_cols_per_key>:<avg_data_size>" +
"[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
/** Usa\ge string for the read option */
private static final String OPT_USAGE_READ =
"<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
private static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
Arrays.toString(StoreFile.BloomType.values());
private static final String OPT_USAGE_COMPRESSION = "Compression type, " +
"one of " + Arrays.toString(Compression.Algorithm.values());
private static final String OPT_BLOOM = "bloom";
private static final String OPT_COMPRESSION = "compression";
private static final String OPT_KEY_WINDOW = "key_window";
private static final String OPT_WRITE = "write";
private static final String OPT_MAX_READ_ERRORS = "max_read_errors";
private static final String OPT_MULTIPUT = "multiput";
private static final String OPT_NUM_KEYS = "num_keys";
private static final String OPT_READ = "read";
private static final String OPT_START_KEY = "start_key";
private static final String OPT_TABLE_NAME = "tn";
private static final String OPT_ZK_QUORUM = "zk";
/** This will be removed as we factor out the dependency on command line */
private CommandLine cmd;
private MultiThreadedWriter writerThreads = null;
private MultiThreadedReader readerThreads = null;
private long startKey, endKey;
private boolean isWrite, isRead;
// Writer options
private int numWriterThreads = DEFAULT_NUM_THREADS;
private long minColsPerKey, maxColsPerKey;
private int minColDataSize, maxColDataSize;
private boolean isMultiPut;
// Reader options
private int numReaderThreads = DEFAULT_NUM_THREADS;
private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
private int verifyPercent;
/** Create tables if needed. */
public void createTables() throws IOException {
HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
COLUMN_FAMILY);
applyBloomFilterAndCompression(tableName, COLUMN_FAMILIES);
}
private String[] splitColonSeparated(String option,
int minNumCols, int maxNumCols) {
String optVal = cmd.getOptionValue(option);
String[] cols = optVal.split(":");
if (cols.length < minNumCols || cols.length > maxNumCols) {
throw new IllegalArgumentException("Expected at least "
+ minNumCols + " columns but no more than " + maxNumCols +
" in the colon-separated value '" + optVal + "' of the " +
"-" + option + " option");
}
return cols;
}
private int getNumThreads(String numThreadsStr) {
return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
}
/**
* Apply the given Bloom filter type to all column families we care about.
*/
private void applyBloomFilterAndCompression(byte[] tableName,
byte[][] columnFamilies) throws IOException {
String bloomStr = cmd.getOptionValue(OPT_BLOOM);
StoreFile.BloomType bloomType = bloomStr == null ? null :
StoreFile.BloomType.valueOf(bloomStr);
String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
Compression.Algorithm compressAlgo = compressStr == null ? null :
Compression.Algorithm.valueOf(compressStr);
if (bloomStr == null && compressStr == null)
return;
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
LOG.info("Disabling table " + Bytes.toString(tableName));
admin.disableTable(tableName);
for (byte[] cf : columnFamilies) {
HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
if (bloomStr != null)
columnDesc.setBloomFilterType(bloomType);
if (compressStr != null)
columnDesc.setCompressionType(compressAlgo);
admin.modifyColumn(tableName, columnDesc);
}
LOG.info("Enabling table " + Bytes.toString(tableName));
admin.enableTable(tableName);
}
@Override
protected void addOptions() {
addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
"without port numbers");
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
addOptWithArg(OPT_READ, OPT_USAGE_READ);
addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
"to tolerate before terminating all reader threads. The default is " +
MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
"reads and writes for concurrent write/read workload. The default " +
"is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
"separate puts for every column in a row");
addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
addRequiredOptWithArg(OPT_START_KEY, "The first key to read/write");
}
@Override
protected void processOptions(CommandLine cmd) {
this.cmd = cmd;
tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME,
DEFAULT_TABLE_NAME));
startKey = parseLong(cmd.getOptionValue(OPT_START_KEY), 0,
Long.MAX_VALUE);
long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
Long.MAX_VALUE - startKey);
endKey = startKey + numKeys;
isWrite = cmd.hasOption(OPT_WRITE);
isRead = cmd.hasOption(OPT_READ);
if (!isWrite && !isRead) {
throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
"-" + OPT_READ + " has to be specified");
}
if (isWrite) {
String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
int colIndex = 0;
minColsPerKey = 1;
maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
int avgColDataSize =
parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
minColDataSize = avgColDataSize / 2;
maxColDataSize = avgColDataSize * 3 / 2;
if (colIndex < writeOpts.length) {
numWriterThreads = getNumThreads(writeOpts[colIndex++]);
}
isMultiPut = cmd.hasOption(OPT_MULTIPUT);
System.out.println("Multi-puts: " + isMultiPut);
System.out.println("Columns per key: " + minColsPerKey + ".."
+ maxColsPerKey);
System.out.println("Data size per column: " + minColDataSize + ".."
+ maxColDataSize);
}
if (isRead) {
String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
int colIndex = 0;
verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
if (colIndex < readOpts.length) {
numReaderThreads = getNumThreads(readOpts[colIndex++]);
}
if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
0, Integer.MAX_VALUE);
}
if (cmd.hasOption(OPT_KEY_WINDOW)) {
keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
0, Integer.MAX_VALUE);
}
System.out.println("Percent of keys to verify: " + verifyPercent);
System.out.println("Reader threads: " + numReaderThreads);
}
System.out.println("Key range: " + startKey + ".." + (endKey - 1));
}
@Override
protected void doWork() throws IOException {
if (cmd.hasOption(OPT_ZK_QUORUM)) {
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
}
createTables();
if (isWrite) {
writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
writerThreads.setMultiPut(isMultiPut);
writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
writerThreads.setDataSize(minColDataSize, maxColDataSize);
}
if (isRead) {
readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
verifyPercent);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
if (isRead && isWrite) {
LOG.info("Concurrent read/write workload: making readers aware of the " +
"write point");
readerThreads.linkToWriter(writerThreads);
}
if (isWrite) {
System.out.println("Starting to write data...");
writerThreads.start(startKey, endKey, numWriterThreads);
}
if (isRead) {
System.out.println("Starting to read data...");
readerThreads.start(startKey, endKey, numReaderThreads);
}
if (isWrite) {
writerThreads.waitForFinish();
}
if (isRead) {
readerThreads.waitForFinish();
}
}
public static void main(String[] args) {
new LoadTestTool().doStaticMain(args);
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test ({@link LoadTestTool}).
*/
public abstract class MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
protected final byte[] tableName;
protected final byte[] columnFamily;
protected final Configuration conf;
protected int numThreads = 1;
/** The start key of the key range, inclusive */
protected long startKey = 0;
/** The end key of the key range, exclusive */
protected long endKey = 1;
protected AtomicInteger numThreadsWorking = new AtomicInteger();
protected AtomicLong numKeys = new AtomicLong();
protected AtomicLong numCols = new AtomicLong();
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
protected int minDataSize = 256;
protected int maxDataSize = 1024;
/** "R" or "W" */
private String actionLetter;
/** Whether we need to print out Hadoop Streaming-style counters */
private boolean streamingCounters;
public static final int REPORTING_INTERVAL_MS = 5000;
public MultiThreadedAction(Configuration conf, byte[] tableName,
byte[] columnFamily, String actionLetter) {
this.conf = conf;
this.tableName = tableName;
this.columnFamily = columnFamily;
this.actionLetter = actionLetter;
}
public void start(long startKey, long endKey, int numThreads)
throws IOException {
this.startKey = startKey;
this.endKey = endKey;
this.numThreads = numThreads;
(new Thread(new ProgressReporter(actionLetter))).start();
}
private static String formatTime(long elapsedTime) {
String format = String.format("%%0%dd", 2);
elapsedTime = elapsedTime / 1000;
String seconds = String.format(format, elapsedTime % 60);
String minutes = String.format(format, (elapsedTime % 3600) / 60);
String hours = String.format(format, elapsedTime / 3600);
String time = hours + ":" + minutes + ":" + seconds;
return time;
}
/** Asynchronously reports progress */
private class ProgressReporter implements Runnable {
private String reporterId = "";
public ProgressReporter(String id) {
this.reporterId = id;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
long priorNumKeys = 0;
long priorCumulativeOpTime = 0;
int priorAverageKeysPerSecond = 0;
// Give other threads time to start.
Threads.sleep(REPORTING_INTERVAL_MS);
while (numThreadsWorking.get() != 0) {
String threadsLeft =
"[" + reporterId + ":" + numThreadsWorking.get() + "] ";
if (numKeys.get() == 0) {
LOG.info(threadsLeft + "Number of keys = 0");
} else {
long numKeys = MultiThreadedAction.this.numKeys.get();
long time = System.currentTimeMillis() - startTime;
long totalOpTime = totalOpTimeMs.get();
long numKeysDelta = numKeys - priorNumKeys;
long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
double averageKeysPerSecond =
(time > 0) ? (numKeys * 1000 / time) : 0;
LOG.info(threadsLeft
+ "Keys="
+ numKeys
+ ", cols="
+ StringUtils.humanReadableInt(numCols.get())
+ ", time="
+ formatTime(time)
+ ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
+ numKeys * 1000 / time + ", latency=" + totalOpTime
/ numKeys + " ms]") : "")
+ ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
+ numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
+ totalOpTimeDelta / numKeysDelta + " ms]") : "")
+ progressInfo());
if (streamingCounters) {
printStreamingCounters(numKeysDelta,
averageKeysPerSecond - priorAverageKeysPerSecond);
}
priorNumKeys = numKeys;
priorCumulativeOpTime = totalOpTime;
priorAverageKeysPerSecond = (int) averageKeysPerSecond;
}
Threads.sleep(REPORTING_INTERVAL_MS);
}
}
private void printStreamingCounters(long numKeysDelta,
double avgKeysPerSecondDelta) {
// Write stats in a format that can be interpreted as counters by
// streaming map-reduce jobs.
System.err.println("reporter:counter:numKeys," + reporterId + ","
+ numKeysDelta);
System.err.println("reporter:counter:numCols," + reporterId + ","
+ numCols.get());
System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
+ "," + (long) (avgKeysPerSecondDelta));
}
}
public void setDataSize(int minDataSize, int maxDataSize) {
this.minDataSize = minDataSize;
this.maxDataSize = maxDataSize;
}
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
}
protected void startThreads(Collection<? extends Thread> threads) {
numThreadsWorking.addAndGet(threads.size());
for (Thread thread : threads) {
thread.start();
}
}
/** @return the end key of the key range, exclusive */
public long getEndKey() {
return endKey;
}
/** Returns a task-specific progress string */
protected abstract String progressInfo();
protected static void appendToStatus(StringBuilder sb, String desc,
long v) {
if (v == 0) {
return;
}
sb.append(", ");
sb.append(desc);
sb.append("=");
sb.append(v);
}
}

View File

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
/** Creates multiple threads that read and verify previously written data */
public class MultiThreadedReader extends MultiThreadedAction
{
private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
private Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
private final double verifyPercent;
private volatile boolean aborted;
private MultiThreadedWriter writer = null;
/**
* The number of keys verified in a sequence. This will never be larger than
* the total number of keys in the range. The reader might also verify
* random keys when it catches up with the writer.
*/
private final AtomicLong numUniqueKeysVerified = new AtomicLong();
/**
* Default maximum number of read errors to tolerate before shutting down all
* readers.
*/
public static final int DEFAULT_MAX_ERRORS = 10;
/**
* Default "window" size between the last key written by the writer and the
* key that we attempt to read. The lower this number, the stricter our
* testing is. If this is zero, we always attempt to read the highest key
* in the contiguous sequence of keys written by the writers.
*/
public static final int DEFAULT_KEY_WINDOW = 0;
protected AtomicLong numKeysVerified = new AtomicLong(0);
private AtomicLong numReadErrors = new AtomicLong(0);
private AtomicLong numReadFailures = new AtomicLong(0);
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
public MultiThreadedReader(Configuration conf, byte[] tableName,
byte[] columnFamily, double verifyPercent) {
super(conf, tableName, columnFamily, "R");
this.verifyPercent = verifyPercent;
}
public void linkToWriter(MultiThreadedWriter writer) {
this.writer = writer;
writer.setTrackInsertedKeys(true);
}
public void setMaxErrors(int maxErrors) {
this.maxErrors = maxErrors;
}
public void setKeyWindow(int keyWindow) {
this.keyWindow = keyWindow;
}
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
}
for (int i = 0; i < numThreads; ++i) {
HBaseReaderThread reader = new HBaseReaderThread(i);
readers.add(reader);
}
startThreads(readers);
}
public class HBaseReaderThread extends Thread {
private final int readerId;
private final HTable table;
private final Random random = new Random();
/** The "current" key being read. Increases from startKey to endKey. */
private long curKey;
/** Time when the thread started */
private long startTimeMs;
/** If we are ahead of the writer and reading a random key. */
private boolean readingRandomKey;
/**
* @param readerId only the keys with this remainder from division by
* {@link #numThreads} will be read by this thread
*/
public HBaseReaderThread(int readerId) throws IOException {
this.readerId = readerId;
table = new HTable(conf, tableName);
setName(getClass().getSimpleName() + "_" + readerId);
}
@Override
public void run() {
try {
runReader();
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Error closing table", e);
}
numThreadsWorking.decrementAndGet();
}
}
private void runReader() {
if (verbose) {
LOG.info("Started thread #" + readerId + " for reads...");
}
startTimeMs = System.currentTimeMillis();
curKey = startKey;
while (curKey < endKey && !aborted) {
long k = getNextKeyToRead();
// A sanity check for the key range.
if (k < startKey || k >= endKey) {
numReadErrors.incrementAndGet();
throw new AssertionError("Load tester logic error: proposed key " +
"to read " + k + " is out of range (startKey=" + startKey +
", endKey=" + endKey + ")");
}
if (k % numThreads != readerId ||
writer != null && writer.failedToWriteKey(k)) {
// Skip keys that this thread should not read, as well as the keys
// that we know the writer failed to write.
continue;
}
readKey(k);
if (k == curKey - 1 && !readingRandomKey) {
// We have verified another unique key.
numUniqueKeysVerified.incrementAndGet();
}
}
}
/**
* Should only be used for the concurrent writer/reader workload. The
* maximum key we are allowed to read, subject to the "key window"
* constraint.
*/
private long maxKeyWeCanRead() {
long insertedUpToKey = writer.insertedUpToKey();
if (insertedUpToKey >= endKey - 1) {
// The writer has finished writing our range, so we can read any
// key in the range.
return endKey - 1;
}
return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow);
}
private long getNextKeyToRead() {
readingRandomKey = false;
if (writer == null || curKey <= maxKeyWeCanRead()) {
return curKey++;
}
// We caught up with the writer. See if we can read any keys at all.
long maxKeyToRead;
while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
// The writer has not written sufficient keys for us to be able to read
// anything at all. Sleep a bit. This should only happen in the
// beginning of a load test run.
Threads.sleepWithoutInterrupt(50);
}
if (curKey <= maxKeyToRead) {
// The writer wrote some keys, and we are now allowed to read our
// current key.
return curKey++;
}
// startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
// Don't increment the current key -- we still have to try reading it
// later. Set a flag to make sure that we don't count this key towards
// the set of unique keys we have verified.
readingRandomKey = true;
return startKey + Math.abs(random.nextLong())
% (maxKeyToRead - startKey + 1);
}
private Get readKey(long keyToRead) {
Get get = new Get(
LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
get.addFamily(columnFamily);
try {
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
+ ", cf " + Bytes.toStringBinary(columnFamily));
}
queryKey(get, random.nextInt(100) < verifyPercent);
} catch (IOException e) {
numReadFailures.addAndGet(1);
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+ ", time from start: "
+ (System.currentTimeMillis() - startTimeMs) + " ms");
}
return get;
}
public void queryKey(Get get, boolean verify) throws IOException {
String rowKey = new String(get.getRow());
// read the data
long start = System.currentTimeMillis();
Result result = table.get(get);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
numKeys.addAndGet(1);
// if we got no data report error
if (result.isEmpty()) {
HRegionLocation hloc = table.getRegionLocation(
Bytes.toBytes(rowKey));
LOG.info("Key = " + rowKey + ", RegionServer: "
+ hloc.getHostname());
numReadErrors.addAndGet(1);
LOG.error("No data returned, tried to get actions for key = "
+ rowKey + (writer == null ? "" : ", keys inserted by writer: " +
writer.numKeys.get() + ")"));
if (numReadErrors.get() > maxErrors) {
LOG.error("Aborting readers -- found more than " + maxErrors
+ " errors\n");
aborted = true;
}
}
if (result.getFamilyMap(columnFamily) != null) {
// increment number of columns read
numCols.addAndGet(result.getFamilyMap(columnFamily).size());
if (verify) {
// verify the result
List<KeyValue> keyValues = result.list();
for (KeyValue kv : keyValues) {
String qual = new String(kv.getQualifier());
// if something does not look right report it
if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
numReadErrors.addAndGet(1);
LOG.error("Error checking data for key = " + rowKey
+ ", actionId = " + qual);
}
}
numKeysVerified.addAndGet(1);
}
}
}
}
public long getNumReadFailures() {
return numReadFailures.get();
}
public long getNumReadErrors() {
return numReadErrors.get();
}
public long getNumKeysVerified() {
return numKeysVerified.get();
}
public long getNumUniqueKeysVerified() {
return numUniqueKeysVerified.get();
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "verified", numKeysVerified.get());
appendToStatus(sb, "READ FAILURES", numReadFailures.get());
appendToStatus(sb, "READ ERRORS", numReadErrors.get());
return sb.toString();
}
}

View File

@ -0,0 +1,310 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedWriter extends MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
private long minColumnsPerKey = 1;
private long maxColumnsPerKey = 10;
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
/**
* A temporary place to keep track of inserted keys. This is written to by
* all writers and is drained on a separate thread that populates
* {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
* being inserted. This queue is supposed to stay small.
*/
private BlockingQueue<Long> insertedKeys =
new ArrayBlockingQueue<Long>(10000);
/**
* This is the current key to be inserted by any thread. Each thread does an
* atomic get and increment operation and inserts the current value.
*/
private AtomicLong nextKeyToInsert = new AtomicLong();
/**
* The highest key in the contiguous range of keys .
*/
private AtomicLong insertedUpToKey = new AtomicLong();
/** The sorted set of keys NOT inserted by the writers */
private Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
/**
* The total size of the temporary inserted key set that have not yet lined
* up in a our contiguous sequence starting from startKey. Supposed to stay
* small.
*/
private AtomicLong insertedKeyQueueSize = new AtomicLong();
/** Enable this if used in conjunction with a concurrent reader. */
private boolean trackInsertedKeys;
public MultiThreadedWriter(Configuration conf, byte[] tableName,
byte[] columnFamily) {
super(conf, tableName, columnFamily, "W");
}
/** Use multi-puts vs. separate puts for every column in a row */
public void setMultiPut(boolean isMultiPut) {
this.isMultiPut = isMultiPut;
}
public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
this.minColumnsPerKey = minColumnsPerKey;
this.maxColumnsPerKey = maxColumnsPerKey;
}
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
}
nextKeyToInsert.set(startKey);
insertedUpToKey.set(startKey - 1);
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThread(i);
writers.add(writer);
}
if (trackInsertedKeys) {
new Thread(new InsertedKeysTracker()).start();
numThreadsWorking.incrementAndGet();
}
startThreads(writers);
}
public static byte[] longToByteArrayKey(long rowKey) {
return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
}
private class HBaseWriterThread extends Thread {
private final HTable table;
private final int writerId;
private final Random random = new Random();
private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
minDataSize, maxDataSize);
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = new HTable(conf, tableName);
this.writerId = writerId;
}
public void run() {
try {
long rowKey;
while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
% (maxColumnsPerKey - minColumnsPerKey);
numKeys.addAndGet(1);
if (isMultiPut) {
multiPutInsertKey(rowKey, 0, numColumns);
} else {
for (long col = 0; col < numColumns; ++col) {
insert(rowKey, col);
}
}
if (trackInsertedKeys) {
insertedKeys.add(rowKey);
}
}
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Error closing table", e);
}
numThreadsWorking.decrementAndGet();
}
}
public void insert(long rowKey, long col) {
Put put = new Put(longToByteArrayKey(rowKey));
String colAsStr = String.valueOf(col);
put.add(columnFamily, colAsStr.getBytes(),
dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
try {
long start = System.currentTimeMillis();
table.put(put);
numCols.addAndGet(1);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(rowKey);
LOG.error("Failed to insert: " + rowKey);
e.printStackTrace();
}
}
public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
if (verbose) {
LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
+ startCol + ", " + endCol + ")");
}
if (startCol >= endCol) {
return;
}
Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
rowKey).getBytes());
byte[] columnQualifier;
byte[] value;
for (long i = startCol; i < endCol; ++i) {
String qualStr = String.valueOf(i);
columnQualifier = qualStr.getBytes();
value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
put.add(columnFamily, columnQualifier, value);
}
try {
long start = System.currentTimeMillis();
table.put(put);
numCols.addAndGet(endCol - startCol);
totalOpTimeMs.addAndGet(
System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(rowKey);
e.printStackTrace();
}
}
}
/**
* A thread that keeps track of the highest key in the contiguous range of
* inserted keys.
*/
private class InsertedKeysTracker implements Runnable {
@Override
public void run() {
Thread.currentThread().setName(getClass().getSimpleName());
try {
long expectedKey = startKey;
Queue<Long> sortedKeys = new PriorityQueue<Long>();
while (expectedKey < endKey) {
// Block until a new element is available.
Long k;
try {
k = insertedKeys.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Inserted key tracker thread interrupted", e);
break;
}
if (k == null) {
continue;
}
if (k == expectedKey) {
// Skip the "sorted key" queue and consume this key.
insertedUpToKey.set(k);
++expectedKey;
} else {
sortedKeys.add(k);
}
// See if we have a sequence of contiguous keys lined up.
while (!sortedKeys.isEmpty()
&& ((k = sortedKeys.peek()) == expectedKey)) {
sortedKeys.poll();
insertedUpToKey.set(k);
++expectedKey;
}
insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size());
}
} catch (Exception ex) {
LOG.error("Error in inserted key tracker", ex);
} finally {
numThreadsWorking.decrementAndGet();
}
}
}
@Override
public void waitForFinish() {
super.waitForFinish();
System.out.println("Failed to write keys: " + failedKeySet.size());
for (Long key : failedKeySet) {
System.out.println("Failed to write key: " + key);
}
}
public int getNumWriteFailures() {
return failedKeySet.size();
}
/**
* The max key until which all keys have been inserted (successfully or not).
* @return the last key that we have inserted all keys up to (inclusive)
*/
public long insertedUpToKey() {
return insertedUpToKey.get();
}
public boolean failedToWriteKey(long k) {
return failedKeySet.contains(k);
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "insertedUpTo", insertedUpToKey.get());
appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get());
return sb.toString();
}
/**
* Used for a joint write/read workload. Enables tracking the last inserted
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted key
*/
void setTrackInsertedKeys(boolean enable) {
trackInsertedKeys = enable;
}
}

View File

@ -0,0 +1,339 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.TreeMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
/**
* A helper class for process-based mini-cluster tests. Unlike
* {@link MiniHBaseCluster}, starts daemons as separate processes, allowing to
* do real kill testing.
*/
public class ProcessBasedLocalHBaseCluster {
private static final String DEFAULT_WORKDIR =
"/tmp/hbase-" + System.getenv("USER");
private final String hbaseHome;
private final String workDir;
private int numRegionServers;
private final int zkClientPort;
private final int masterPort;
private final Configuration conf;
private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
private static final Log LOG = LogFactory.getLog(
ProcessBasedLocalHBaseCluster.class);
private List<String> daemonPidFiles =
Collections.synchronizedList(new ArrayList<String>());;
private boolean shutdownHookInstalled;
private String hbaseDaemonScript;
/**
* Constructor. Modifies the passed configuration.
* @param hbaseHome the top directory of the HBase source tree
*/
public ProcessBasedLocalHBaseCluster(Configuration conf, String hbaseHome,
int numRegionServers) {
this.conf = conf;
this.hbaseHome = hbaseHome;
this.numRegionServers = numRegionServers;
this.workDir = DEFAULT_WORKDIR;
hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
zkClientPort = HBaseTestingUtility.randomFreePort();
masterPort = HBaseTestingUtility.randomFreePort();
conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
}
public void start() throws IOException {
cleanupOldState();
// start ZK
LOG.info("Starting ZooKeeper");
startZK();
HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
startMaster();
ZKUtil.waitForBaseZNode(conf);
for (int idx = 0; idx < numRegionServers; idx++) {
startRegionServer(HBaseTestingUtility.randomFreePort());
}
LOG.info("Waiting for HBase startup by scanning META");
int attemptsLeft = 10;
while (attemptsLeft-- > 0) {
try {
new HTable(conf, HConstants.META_TABLE_NAME);
} catch (Exception e) {
LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft,
e);
Threads.sleep(1000);
}
}
LOG.info("Process-based HBase Cluster with " + numRegionServers +
" region servers up and running... \n\n");
}
public void startRegionServer(int port) {
startServer("regionserver", port);
}
public void startMaster() {
startServer("master", 0);
}
public void killRegionServer(int port) throws IOException {
killServer("regionserver", port);
}
public void killMaster() throws IOException {
killServer("master", 0);
}
public void startZK() {
startServer("zookeeper", 0);
}
private void executeCommand(String command) {
ensureShutdownHookInstalled();
executeCommand(command, null);
}
private void executeCommand(String command, Map<String,
String> envOverrides) {
LOG.debug("Command : " + command);
try {
String [] envp = null;
if (envOverrides != null) {
Map<String, String> map = new HashMap<String, String>(
System.getenv());
map.putAll(envOverrides);
envp = new String[map.size()];
int idx = 0;
for (Map.Entry<String, String> e: map.entrySet()) {
envp[idx++] = e.getKey() + "=" + e.getValue();
}
}
Process p = Runtime.getRuntime().exec(command, envp);
BufferedReader stdInput = new BufferedReader(
new InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(
new InputStreamReader(p.getErrorStream()));
// read the output from the command
String s = null;
while ((s = stdInput.readLine()) != null) {
System.out.println(s);
}
// read any errors from the attempted command
while ((s = stdError.readLine()) != null) {
System.out.println(s);
}
} catch (IOException e) {
LOG.error("Error running: " + command, e);
}
}
private void shutdownAllProcesses() {
LOG.info("Killing daemons using pid files");
final List<String> pidFiles = new ArrayList<String>(daemonPidFiles);
for (String pidFile : pidFiles) {
int pid = 0;
try {
pid = readPidFromFile(pidFile);
} catch (IOException ex) {
LOG.error("Could not kill process with pid from " + pidFile);
}
if (pid > 0) {
LOG.info("Killing pid " + pid + " (" + pidFile + ")");
killProcess(pid);
}
}
LOG.info("Waiting a bit to let processes terminate");
Threads.sleep(5000);
}
private void ensureShutdownHookInstalled() {
if (shutdownHookInstalled) {
return;
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
shutdownAllProcesses();
}
}));
shutdownHookInstalled = true;
}
private void cleanupOldState() {
executeCommand("rm -rf " + workDir);
}
private void writeStringToFile(String s, String fileName) {
try {
BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
out.write(s);
out.close();
} catch (IOException e) {
LOG.error("Error writing to: " + fileName, e);
}
}
private String serverWorkingDir(String serverName, int port) {
String dir;
if (serverName.equals("regionserver")) {
dir = workDir + "/" + serverName + "-" + port;
} else {
dir = workDir + "/" + serverName;
}
return dir;
}
private int getServerPID(String serverName, int port) throws IOException {
String pidFile = pidFilePath(serverName, port);
return readPidFromFile(pidFile);
}
private static int readPidFromFile(String pidFile) throws IOException {
Scanner scanner = new Scanner(new File(pidFile));
try {
return scanner.nextInt();
} finally {
scanner.close();
}
}
private String pidFilePath(String serverName, int port) {
String dir = serverWorkingDir(serverName, port);
String user = System.getenv("USER");
String pidFile = String.format("%s/hbase-%s-%s.pid",
dir, user, serverName);
return pidFile;
}
private void killServer(String serverName, int port) throws IOException {
int pid = getServerPID(serverName, port);
if (pid > 0) {
LOG.info("Killing " + serverName + "; pid=" + pid);
killProcess(pid);
}
}
private void killProcess(int pid) {
String cmd = "kill -s KILL " + pid;
executeCommand(cmd);
}
private void startServer(String serverName, int rsPort) {
String conf = generateConfig(rsPort);
// create working directory for this region server.
String dir = serverWorkingDir(serverName, rsPort);
executeCommand("mkdir -p " + dir);
writeStringToFile(conf, dir + "/" + "hbase-site.xml");
Map<String, String> envOverrides = new HashMap<String, String>();
envOverrides.put("HBASE_LOG_DIR", dir);
envOverrides.put("HBASE_PID_DIR", dir);
try {
FileUtils.copyFile(
new File(hbaseHome, "conf/log4j.properties"),
new File(dir, "log4j.properties"));
} catch (IOException ex) {
LOG.error("Could not install log4j.properties into " + dir);
}
executeCommand(hbaseDaemonScript + " --config " + dir +
" start " + serverName, envOverrides);
daemonPidFiles.add(pidFilePath(serverName, rsPort));
}
private final String generateConfig(int rsPort) {
StringBuilder sb = new StringBuilder();
Map<String, Object> confMap = new TreeMap<String, Object>();
confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
if (rsPort > 0) {
confMap.put(HConstants.REGIONSERVER_PORT, rsPort);
confMap.put(HConstants.REGIONSERVER_INFO_PORT_AUTO, true);
}
confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
confMap.put(HConstants.MASTER_PORT, masterPort);
confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
confMap.put("fs.file.impl", RawLocalFileSystem.class.getName());
sb.append("<configuration>\n");
for (Map.Entry<String, Object> entry : confMap.entrySet()) {
sb.append(" <property>\n");
sb.append(" <name>" + entry.getKey() + "</name>\n");
sb.append(" <value>" + entry.getValue() + "</value>\n");
sb.append(" </property>\n");
}
sb.append("</configuration>\n");
return sb.toString();
}
public Configuration getConf() {
return conf;
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.File;
import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
/**
* A command-line tool that spins up a local process-based cluster, loads
* some data, restarts the regionserver holding .META., and verifies that the
* cluster recovers.
*/
public class RestartMetaTest extends AbstractHBaseTool {
private static final Log LOG = LogFactory.getLog(RestartMetaTest.class);
/** The number of region servers used if not specified */
private static final int DEFAULT_NUM_RS = 2;
/** Table name for the test */
private static byte[] TABLE_NAME = Bytes.toBytes("load_test");
/** The number of seconds to sleep after loading the data */
private static final int SLEEP_SEC_AFTER_DATA_LOAD = 5;
/** The actual number of region servers */
private int numRegionServers;
/** HBase home source tree home directory */
private String hbaseHome;
private static final String OPT_HBASE_HOME = "hbase_home";
private static final String OPT_NUM_RS = "num_rs";
/** Loads data into the table using the multi-threaded writer. */
private void loadData() throws IOException {
long startKey = 0;
long endKey = 100000;
long minColsPerKey = 5;
long maxColsPerKey = 15;
int minColDataSize = 256;
int maxColDataSize = 256 * 3;
int numThreads = 10;
// print out the arguments
System.out.printf("Key range %d .. %d\n", startKey, endKey);
System.out.printf("Number of Columns/Key: %d..%d\n", minColsPerKey,
maxColsPerKey);
System.out.printf("Data Size/Column: %d..%d bytes\n", minColDataSize,
maxColDataSize);
System.out.printf("Client Threads: %d\n", numThreads);
// start the writers
MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME,
LoadTestTool.COLUMN_FAMILY);
writer.setMultiPut(true);
writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
writer.setDataSize(minColDataSize, maxColDataSize);
writer.start(startKey, endKey, numThreads);
System.out.printf("Started loading data...");
writer.waitForFinish();
System.out.printf("Finished loading data...");
}
@Override
protected void doWork() throws IOException {
ProcessBasedLocalHBaseCluster hbaseCluster =
new ProcessBasedLocalHBaseCluster(conf, hbaseHome, numRegionServers);
// start the process based HBase cluster
hbaseCluster.start();
// create tables if needed
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
LoadTestTool.COLUMN_FAMILY);
LOG.debug("Loading data....\n\n");
loadData();
LOG.debug("Sleeping for " + SLEEP_SEC_AFTER_DATA_LOAD +
" seconds....\n\n");
Threads.sleep(5 * SLEEP_SEC_AFTER_DATA_LOAD);
int metaRSPort = HBaseTestingUtility.getMetaRSPort(conf);
LOG.debug("Killing META region server running on port " + metaRSPort);
hbaseCluster.killRegionServer(metaRSPort);
Threads.sleep(2000);
LOG.debug("Restarting region server running on port metaRSPort");
hbaseCluster.startRegionServer(metaRSPort);
Threads.sleep(2000);
LOG.debug("Trying to scan meta");
HTable metaTable = new HTable(conf, HConstants.META_TABLE_NAME);
ResultScanner scanner = metaTable.getScanner(new Scan());
Result result;
while ((result = scanner.next()) != null) {
LOG.info("Region assignment from META: "
+ Bytes.toStringBinary(result.getRow())
+ " => "
+ Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY)
.get(HConstants.SERVER_QUALIFIER)));
}
}
@Override
protected void addOptions() {
addRequiredOptWithArg(OPT_HBASE_HOME, "HBase home directory");
addOptWithArg(OPT_NUM_RS, "Number of Region Servers");
}
@Override
protected void processOptions(CommandLine cmd) {
hbaseHome = cmd.getOptionValue(OPT_HBASE_HOME);
if (hbaseHome == null || !new File(hbaseHome).isDirectory()) {
throw new IllegalArgumentException("Invalid HBase home directory: " +
hbaseHome);
}
LOG.info("Using HBase home directory " + hbaseHome);
numRegionServers = Integer.parseInt(cmd.getOptionValue(OPT_NUM_RS,
String.valueOf(DEFAULT_NUM_RS)));
}
public static void main(String[] args) {
new RestartMetaTest().doStaticMain(args);
}
}

View File

@ -25,6 +25,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays; import java.util.Arrays;
import junit.framework.TestCase; import junit.framework.TestCase;

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestLoadTestKVGenerator {
private static final int MIN_LEN = 10;
private static final int MAX_LEN = 20;
private Random rand = new Random(28937293L);
private LoadTestKVGenerator gen = new LoadTestKVGenerator(MIN_LEN, MAX_LEN);
@Test
public void testValueLength() {
for (int i = 0; i < 1000; ++i) {
byte[] v = gen.generateRandomSizeValue(i,
String.valueOf(rand.nextInt()));
assertTrue(MIN_LEN <= v.length);
assertTrue(v.length <= MAX_LEN);
}
}
@Test
public void testVerification() {
for (int i = 0; i < 1000; ++i) {
for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
String qual = String.valueOf(qualIndex);
byte[] v = gen.generateRandomSizeValue(i, qual);
String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
v[0]++;
assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
}
}
}
@Test
public void testCorrectAndUniqueKeys() {
Set<String> keys = new HashSet<String>();
for (int i = 0; i < 1000; ++i) {
String k = LoadTestKVGenerator.md5PrefixedKey(i);
assertFalse(keys.contains(k));
assertTrue(k.endsWith(":" + i));
keys.add(k);
}
}
}

View File

@ -70,7 +70,7 @@ public class TestMergeTable {
desc.addFamily(new HColumnDescriptor(COLUMN_NAME)); desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
// Set maximum regionsize down. // Set maximum regionsize down.
UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 64L * 1024L * 1024L); UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 64L * 1024L * 1024L);
// Make it so we don't split. // Make it so we don't split.
UTIL.getConfiguration().setInt("hbase.regionserver.regionSplitLimit", 0); UTIL.getConfiguration().setInt("hbase.regionserver.regionSplitLimit", 0);
// Startup hdfs. Its in here we'll be putting our manually made regions. // Startup hdfs. Its in here we'll be putting our manually made regions.

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.LargeTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* A write/read/verify load test on a mini HBase cluster. Tests reading
* and writing at the same time.
*/
@Category(LargeTests.class)
@RunWith(Parameterized.class)
public class TestMiniClusterLoadParallel
extends TestMiniClusterLoadSequential {
public TestMiniClusterLoadParallel(boolean isMultiPut) {
super(isMultiPut);
}
@Test(timeout=120000)
public void loadTest() throws Exception {
prepareForLoadTest();
readerThreads.linkToWriter(writerThreads);
writerThreads.start(0, NUM_KEYS, NUM_THREADS);
readerThreads.start(0, NUM_KEYS, NUM_THREADS);
writerThreads.waitForFinish();
readerThreads.waitForFinish();
assertEquals(0, writerThreads.getNumWriteFailures());
assertEquals(0, readerThreads.getNumReadFailures());
assertEquals(0, readerThreads.getNumReadErrors());
assertEquals(NUM_KEYS, readerThreads.getNumUniqueKeysVerified());
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* A write/read/verify load test on a mini HBase cluster. Tests reading
* and then writing.
*/
@Category(LargeTests.class)
@RunWith(Parameterized.class)
public class TestMiniClusterLoadSequential {
private static final Log LOG = LogFactory.getLog(
TestMiniClusterLoadSequential.class);
protected static final byte[] TABLE = Bytes.toBytes("load_test_tbl");
protected static final byte[] CF = Bytes.toBytes("load_test_cf");
protected static final long NUM_KEYS = 10000;
protected static final int NUM_THREADS = 8;
protected static final int NUM_RS = 2;
protected static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
protected final Configuration conf = TEST_UTIL.getConfiguration();
protected final boolean isMultiPut;
protected MultiThreadedWriter writerThreads;
protected MultiThreadedReader readerThreads;
public TestMiniClusterLoadSequential(boolean isMultiPut) {
this.isMultiPut = isMultiPut;
}
@Parameters
public static Collection<Object[]> parameters() {
return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
}
@Before
public void setUp() throws Exception {
LOG.debug("Test setup: isMultiPut=" + isMultiPut);
TEST_UTIL.startMiniCluster(1, NUM_RS);
}
@After
public void tearDown() throws Exception {
LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
TEST_UTIL.shutdownMiniCluster();
}
@Test(timeout=120000)
public void loadTest() throws Exception {
prepareForLoadTest();
writerThreads.start(0, NUM_KEYS, NUM_THREADS);
writerThreads.waitForFinish();
assertEquals(0, writerThreads.getNumWriteFailures());
readerThreads.start(0, NUM_KEYS, NUM_THREADS);
readerThreads.waitForFinish();
assertEquals(0, readerThreads.getNumReadFailures());
assertEquals(0, readerThreads.getNumReadErrors());
assertEquals(NUM_KEYS, readerThreads.getNumKeysVerified());
}
protected void prepareForLoadTest() throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
while (admin.getClusterStatus().getServers().size() < NUM_RS) {
LOG.info("Sleeping until " + NUM_RS + " RSs are online");
Threads.sleepWithoutInterrupt(1000);
}
admin.close();
int numRegions =
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE, CF);
TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
writerThreads.setMultiPut(isMultiPut);
readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
}
}