From 09d0aff149a0e5147c5245c3dd7d94281f8fefff Mon Sep 17 00:00:00 2001 From: Nicolas Spiegelberg Date: Thu, 8 Dec 2011 02:38:27 +0000 Subject: [PATCH] [jira] [HBASE-4908] HBase cluster test tool (port from 0.89-fb) Summary: Porting one of our HBase cluster test tools (a single-process multi-threaded load generator and verifier) from 0.89-fb to trunk. I cleaned up the code a bit compared to what's in 0.89-fb, and discovered that it has some features that I have not tried yet (some kind of a kill test, and some way to run HBase as multiple processes on one machine). The main utility of this piece of code for us has been the HBaseClusterTest command-line tool (called HBaseTest in 0.89-fb), which we usually invoke as a load test in our five-node dev cluster testing, e.g.: hbase org.apache.hadoop.hbase.util.LoadTestTool -write 50:100:20 -tn loadtest4 -read 100:10 -zk -bloom ROWCOL -compression LZO -key_window 5 -max_read_errors 10000 -num_keys 10000000000 -start_key 0 Test Plan: Run this on a dev cluster. Run all unit tests. Reviewers: stack, Karthik, Kannan, nspiegelberg, JIRA Reviewed By: nspiegelberg CC: stack, nspiegelberg, mbautin, Karthik Differential Revision: 549 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1211746 13f79535-47bb-0310-9956-ffa450edef68 --- pom.xml | 176 ++++----- .../org/apache/hadoop/hbase/EmptyWatcher.java | 4 +- .../org/apache/hadoop/hbase/HConstants.java | 24 +- .../hbase/client/HConnectionManager.java | 12 +- .../hbase/mapreduce/HFileOutputFormat.java | 30 +- .../regionserver/CompactSplitThread.java | 6 +- .../ConstantSizeRegionSplitPolicy.java | 6 +- .../hbase/regionserver/HRegionServer.java | 3 +- .../wal/SequenceFileLogReader.java | 39 +- .../hadoop/hbase/util/AbstractHBaseTool.java | 181 ++++++++++ .../org/apache/hadoop/hbase/util/Bytes.java | 35 +- .../org/apache/hadoop/hbase/util/HMerge.java | 4 +- .../org/apache/hadoop/hbase/util/Keying.java | 3 +- .../hadoop/hbase/util/RegionSplitter.java | 2 +- .../hadoop/hbase/zookeeper/ZKConfig.java | 4 +- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 50 ++- .../hadoop/hbase/HBaseTestingUtility.java | 114 +++++- .../apache/hadoop/hbase/client/TestAdmin.java | 28 +- .../coprocessor/TestCoprocessorInterface.java | 16 +- .../mapreduce/TestHFileOutputFormat.java | 62 ++-- .../hbase/regionserver/TestHRegion.java | 2 +- .../regionserver/TestRegionSplitPolicy.java | 36 +- .../regionserver/wal/TestLogRolling.java | 8 +- .../hbase/util/LoadTestKVGenerator.java | 101 ++++++ .../hadoop/hbase/util/LoadTestTool.java | 305 ++++++++++++++++ .../hbase/util/MultiThreadedAction.java | 205 +++++++++++ .../hbase/util/MultiThreadedReader.java | 320 +++++++++++++++++ .../hbase/util/MultiThreadedWriter.java | 310 ++++++++++++++++ .../util/ProcessBasedLocalHBaseCluster.java | 339 ++++++++++++++++++ .../hadoop/hbase/util/RestartMetaTest.java | 155 ++++++++ .../apache/hadoop/hbase/util/TestBytes.java | 5 +- .../hbase/util/TestLoadTestKVGenerator.java | 74 ++++ .../hadoop/hbase/util/TestMergeTable.java | 4 +- .../util/TestMiniClusterLoadParallel.java | 58 +++ .../util/TestMiniClusterLoadSequential.java | 116 ++++++ 35 files changed, 2584 insertions(+), 253 deletions(-) rename src/{test => main}/java/org/apache/hadoop/hbase/EmptyWatcher.java (92%) create mode 100644 src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/ProcessBasedLocalHBaseCluster.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java diff --git a/pom.xml b/pom.xml index ad8aacd0144..f8d822a0044 100644 --- a/pom.xml +++ b/pom.xml @@ -287,9 +287,9 @@ true - + - + ghelmling.testing @@ -302,7 +302,7 @@ true - + org.apache.maven.surefire ${surefire.provider} ${surefire.version} - + 900 @@ -349,13 +349,13 @@ org.apache.maven.plugins maven-failsafe-plugin ${surefire.version} - + org.apache.maven.surefire ${surefire.provider} ${surefire.version} - - + + ${integrationtest.include} @@ -364,7 +364,7 @@ ${unittest.include} **/*$* ${test.exclude.pattern} - + true ${env.LD_LIBRARY_PATH}:${project.build.directory}/nativelib @@ -639,22 +639,22 @@ test ${surefire.skipSecondPart} - false + false always none ${surefire.secondPartGroups} - + - + org.apache.maven.plugins maven-failsafe-plugin false - always - + always + maven-antrun-plugin @@ -867,6 +867,7 @@ 1.2 1.4 3.1 + 2.1 2.5 1.1.1 2.1 @@ -888,31 +889,31 @@ 3.4.0 0.0.1-SNAPSHOT - /usr - /etc/hbase + /usr + /etc/hbase /var/log/hbase /var/run/hbase - 1 + 1 - 0.91.0 + 0.91.0 ${project.artifactId}-${project.version} **/Test*.java **/IntegrationTest*.java - + 2.11-TRUNK-HBASE-2 surefire-junit47 - - - false + + + false false - + once none - 1 - - org.apache.hadoop.hbase.SmallTests + 1 + + org.apache.hadoop.hbase.SmallTests org.apache.hadoop.hbase.MediumTests @@ -954,6 +955,11 @@ commons-httpclient ${commons-httpclient.version} + + commons-io + commons-io + ${commons-io.version} + commons-lang commons-lang @@ -1053,8 +1059,8 @@ servlet-api-2.5 ${jetty.jspapi.version} - org.codehaus.jackson @@ -1280,7 +1286,7 @@ - + os.linux @@ -1429,7 +1435,7 @@ - + security @@ -1876,8 +1882,8 @@ - - @@ -1888,11 +1894,11 @@ always none - 1 + 1 - - + + parallelTests false @@ -1900,11 +1906,11 @@ once classes - 1 + 1 - - - + + + singleJVMTests false @@ -1912,14 +1918,14 @@ once none - 1 - - false - true - + 1 + + false + true + - - + + runSmallTests @@ -1928,14 +1934,14 @@ once none - 1 - - false - true - org.apache.hadoop.hbase.SmallTests + 1 + + false + true + org.apache.hadoop.hbase.SmallTests - + runMediumTests @@ -1944,13 +1950,13 @@ always - false - true - org.apache.hadoop.hbase.MediumTests + false + true + org.apache.hadoop.hbase.MediumTests - - + + runLargeTests @@ -1958,13 +1964,13 @@ always - false - true - org.apache.hadoop.hbase.LargeTests + false + true + org.apache.hadoop.hbase.LargeTests - - + + runDevTests @@ -1973,15 +1979,15 @@ once none - 1 - - false - false - org.apache.hadoop.hbase.SmallTests + 1 + + false + false + org.apache.hadoop.hbase.SmallTests org.apache.hadoop.hbase.MediumTests - - + + runAllTests @@ -1990,43 +1996,43 @@ once none - 1 - - false + 1 + + false false - org.apache.hadoop.hbase.SmallTests + org.apache.hadoop.hbase.SmallTests org.apache.hadoop.hbase.MediumTests,org.apache.hadoop.hbase.LargeTests - + skipSurefireTests false - - true + + true true - - + + localTests false - surefire-junit4 - 2.10 - - always - false + surefire-junit4 + 2.10 + + always + false true - + - + diff --git a/src/test/java/org/apache/hadoop/hbase/EmptyWatcher.java b/src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java similarity index 92% rename from src/test/java/org/apache/hadoop/hbase/EmptyWatcher.java rename to src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java index cf27ff032e9..e0e0a288992 100644 --- a/src/test/java/org/apache/hadoop/hbase/EmptyWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/EmptyWatcher.java @@ -23,9 +23,9 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.WatchedEvent; /** - * Class used as an empty watche for the tests + * An empty ZooKeeper watcher */ -public class EmptyWatcher implements Watcher{ +public class EmptyWatcher implements Watcher { public static EmptyWatcher instance = new EmptyWatcher(); private EmptyWatcher() {} diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index d22f50a42bd..d39f8a5e1cc 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -144,6 +144,12 @@ public final class HConstants { /** Default limit on concurrent client-side zookeeper connections */ public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30; + /** Configuration key for ZooKeeper session timeout */ + public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; + + /** Default value for ZooKeeper session timeout */ + public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000; + /** Parameter name for port region server listens on. */ public static final String REGIONSERVER_PORT = "hbase.regionserver.port"; @@ -153,6 +159,10 @@ public final class HConstants { /** default port for region server web api */ public static final int DEFAULT_REGIONSERVER_INFOPORT = 60030; + /** A flag that enables automatic selection of regionserver info port */ + public static final String REGIONSERVER_INFO_PORT_AUTO = + "hbase.regionserver.info.port.auto"; + /** Parameter name for what region server interface to use. */ public static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; @@ -204,6 +214,10 @@ public final class HConstants { /** Used to construct the name of the compaction directory during compaction */ public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir"; + /** Conf key for the max file size after which we split the region */ + public static final String HREGION_MAX_FILESIZE = + "hbase.hregion.max.filesize"; + /** Default maximum file size */ public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024; @@ -504,9 +518,9 @@ public final class HConstants { */ public static final String REPLICATION_ENABLE_KEY = "hbase.replication"; - public static final String + public static final String REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; - public static final String + public static final String REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service"; public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = "org.apache.hadoop.hbase.replication.regionserver.Replication"; @@ -547,6 +561,12 @@ public final class HConstants { "(" + CP_HTD_ATTR_VALUE_PARAM_KEY_PATTERN + ")=(" + CP_HTD_ATTR_VALUE_PARAM_VALUE_PATTERN + "),?"); + /** The delay when re-trying a socket operation in a loop (HBASE-4712) */ + public static final int SOCKET_RETRY_WAIT_MS = 200; + + /** Host name of the local machine */ + public static final String LOCALHOST = "localhost"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 6af1f82092a..6f19d21c55f 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -493,7 +493,7 @@ public class HConnectionManager { private MasterAddressTracker masterAddressTracker; private RootRegionTracker rootRegionTracker; private ClusterId clusterId; - + private final Object metaRegionLock = new Object(); private final Object userRegionLock = new Object(); @@ -687,7 +687,7 @@ public class HConnectionManager { throw new MasterNotRunningException(errorMsg); } } - + public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { if (this.master == null) { @@ -800,7 +800,7 @@ public class HConnectionManager { try { ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); - LOG.debug("Lookedup root region location, connection=" + this + + LOG.debug("Looked up root region location, connection=" + this + "; serverName=" + ((servername == null)? "": servername.toString())); if (servername == null) return null; return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, @@ -987,7 +987,7 @@ public class HConnectionManager { throw new NoServerForRegionException("No server address listed " + "in " + Bytes.toString(parentTable) + " for region " + regionInfo.getRegionNameAsString() + " containing row " + - Bytes.toStringBinary(row)); + Bytes.toStringBinary(row)); } // Instantiate the location @@ -1072,7 +1072,7 @@ public class HConnectionManager { if (!matchingRegions.isEmpty()) { HRegionLocation possibleRegion = null; try { - possibleRegion = matchingRegions.get(matchingRegions.lastKey()); + possibleRegion = matchingRegions.get(matchingRegions.lastKey()); } catch (NoSuchElementException nsee) { LOG.warn("checkReferences() might have removed the key", nsee); } @@ -1738,7 +1738,7 @@ public class HConnectionManager { public boolean isClosed() { return this.closed; } - + @Override public boolean isAborted(){ return this.aborted; diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index ce1c45b06eb..5150590dd80 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -83,7 +83,7 @@ public class HFileOutputFormat extends FileOutputFormatSets the output key/value class to match HFileOutputFormat's requirements *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or * PutSortReducer)
  • - * + * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. */ @@ -313,7 +313,7 @@ public class HFileOutputFormat extends FileOutputFormat(), + 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java index 6b814d75ce2..13b5bbf1ac1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java @@ -34,17 +34,17 @@ class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy { // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE. if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) { - maxFileSize = getConf().getLong("hbase.hregion.max.filesize", + maxFileSize = getConf().getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); } this.desiredMaxFileSize = maxFileSize; } - + @Override boolean shouldSplit() { boolean force = region.shouldForceSplit(); boolean foundABigStore = false; - + for (Store store : region.getStores().values()) { // If any of the stores are unable to split (eg they contain reference files) // then don't split diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bda089223d1..aae0b26de0d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1512,7 +1512,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, if (port < 0) return port; String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); // check if auto port bind enabled - boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto", false); + boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, + false); while (true) { try { this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index 497c5d03e96..b2c2df5b502 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -26,13 +26,15 @@ import java.lang.Class; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; - +import java.util.Arrays; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient.DFSInputStream; import org.apache.hadoop.io.SequenceFile; public class SequenceFileLogReader implements HLog.Reader { @@ -82,8 +84,8 @@ public class SequenceFileLogReader implements HLog.Reader { // This section can be confusing. It is specific to how HDFS works. // Let me try to break it down. This is the problem: // - // 1. HDFS DataNodes update the NameNode about a filename's length - // on block boundaries or when a file is closed. Therefore, + // 1. HDFS DataNodes update the NameNode about a filename's length + // on block boundaries or when a file is closed. Therefore, // if an RS dies, then the NN's fs.getLength() can be out of date // 2. this.in.available() would work, but it returns int & // therefore breaks for files > 2GB (happens on big clusters) @@ -91,7 +93,7 @@ public class SequenceFileLogReader implements HLog.Reader { // 4. DFSInputStream is wrapped 2 levels deep : this.in.in // // So, here we adjust getPos() using getFileLength() so the - // SequenceFile.Reader constructor (aka: first invocation) comes out + // SequenceFile.Reader constructor (aka: first invocation) comes out // with the correct end of the file: // this.end = in.getPos() + length; @Override @@ -104,13 +106,18 @@ public class SequenceFileLogReader implements HLog.Reader { Field fIn = FilterInputStream.class.getDeclaredField("in"); fIn.setAccessible(true); Object realIn = fIn.get(this.in); - Method getFileLength = realIn.getClass(). - getMethod("getFileLength", new Class []{}); - getFileLength.setAccessible(true); - long realLength = ((Long)getFileLength. - invoke(realIn, new Object []{})).longValue(); - assert(realLength >= this.length); - adjust = realLength - this.length; + if (realIn.getClass() == DFSInputStream.class) { + Method getFileLength = realIn.getClass(). + getDeclaredMethod("getFileLength", new Class []{}); + getFileLength.setAccessible(true); + long realLength = ((Long)getFileLength. + invoke(realIn, new Object []{})).longValue(); + assert(realLength >= this.length); + adjust = realLength - this.length; + } else { + LOG.info("Input stream class: " + realIn.getClass().getName() + + ", not adjusting length"); + } } catch(Exception e) { SequenceFileLogReader.LOG.warn( "Error while trying to get accurate file length. " + @@ -142,7 +149,7 @@ public class SequenceFileLogReader implements HLog.Reader { /** * This constructor allows a specific HLogKey implementation to override that * which would otherwise be chosen via configuration property. - * + * * @param keyClass */ public SequenceFileLogReader(Class keyClass) { @@ -189,7 +196,7 @@ public class SequenceFileLogReader implements HLog.Reader { throw new IOException(iae); } } - + WALEdit val = new WALEdit(); e = new HLog.Entry(key, val); } @@ -235,8 +242,8 @@ public class SequenceFileLogReader implements HLog.Reader { } catch(Exception e) { /* reflection fail. keep going */ } String msg = (this.path == null? "": this.path.toString()) + - ", entryStart=" + entryStart + ", pos=" + pos + - ((end == Long.MAX_VALUE) ? "" : ", end=" + end) + + ", entryStart=" + entryStart + ", pos=" + pos + + ((end == Long.MAX_VALUE) ? "" : ", end=" + end) + ", edit=" + this.edit; // Enhance via reflection so we don't change the original class type @@ -246,7 +253,7 @@ public class SequenceFileLogReader implements HLog.Reader { .newInstance(msg) .initCause(ioe); } catch(Exception e) { /* reflection fail. keep going */ } - + return ioe; } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java new file mode 100644 index 00000000000..779cdc1fd54 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Common base class used for HBase command-line tools. Simplifies workflow and + * command-line argument parsing. + */ +public abstract class AbstractHBaseTool implements Tool { + + private static final int EXIT_SUCCESS = 0; + private static final int EXIT_FAILURE = 1; + + private static final String HELP_OPTION = "help"; + + private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class); + + private final Options options = new Options(); + + protected Configuration conf = null; + + private static final Set requiredOptions = new TreeSet(); + + /** + * Override this to add command-line options using {@link #addOptWithArg} + * and similar methods. + */ + protected abstract void addOptions(); + + /** + * This method is called to process the options after they have been parsed. + */ + protected abstract void processOptions(CommandLine cmd); + + /** The "main function" of the tool */ + protected abstract void doWork() throws Exception; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public final int run(String[] args) throws Exception { + if (conf == null) { + LOG.error("Tool configuration is not initialized"); + throw new NullPointerException("conf"); + } + + CommandLine cmd; + try { + // parse the command line arguments + cmd = parseArgs(args); + } catch (ParseException e) { + LOG.error("Error when parsing command-line arguemnts", e); + printUsage(); + return EXIT_FAILURE; + } + + if (cmd.hasOption(HELP_OPTION) || !sanityCheckOptions(cmd)) { + printUsage(); + return EXIT_FAILURE; + } + + processOptions(cmd); + + try { + doWork(); + } catch (Exception e) { + LOG.error("Error running command-line tool", e); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; + } + + private boolean sanityCheckOptions(CommandLine cmd) { + boolean success = true; + for (String reqOpt : requiredOptions) { + if (!cmd.hasOption(reqOpt)) { + LOG.error("Required option -" + reqOpt + " is missing"); + success = false; + } + } + return success; + } + + private CommandLine parseArgs(String[] args) throws ParseException { + options.addOption(HELP_OPTION, false, "Show usage"); + addOptions(); + CommandLineParser parser = new BasicParser(); + return parser.parse(options, args); + } + + private void printUsage() { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(80); + String usageHeader = "Options:"; + String usageFooter = ""; + String usageStr = "bin/hbase " + getClass().getName() + " "; + + helpFormatter.printHelp(usageStr, usageHeader, options, + usageFooter); + } + + protected void addRequiredOptWithArg(String opt, String description) { + requiredOptions.add(opt); + addOptWithArg(opt, description); + } + + protected void addOptNoArg(String opt, String description) { + options.addOption(opt, false, description); + } + + protected void addOptWithArg(String opt, String description) { + options.addOption(opt, true, description); + } + + /** + * Parse a number and enforce a range. + */ + public static long parseLong(String s, long minValue, long maxValue) { + long l = Long.parseLong(s); + if (l < minValue || l > maxValue) { + throw new IllegalArgumentException("The value " + l + + " is out of range [" + minValue + ", " + maxValue + "]"); + } + return l; + } + + public static int parseInt(String s, int minValue, int maxValue) { + return (int) parseLong(s, minValue, maxValue); + } + + /** Call this from the concrete tool class's main function. */ + protected void doStaticMain(String args[]) { + int ret; + try { + ret = ToolRunner.run(HBaseConfiguration.create(), this, args); + } catch (Exception ex) { + LOG.error("Error running command-line tool", ex); + ret = EXIT_FAILURE; + } + System.exit(ret); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 0e79609f83c..ead9a3bf2ad 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -33,6 +33,7 @@ import java.security.PrivilegedAction; import java.util.Comparator; import java.util.Iterator; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; @@ -312,7 +313,7 @@ public class Bytes { return "null"; return toStringBinary(b, 0, b.length); } - + /** * Converts the given byte buffer, from its array offset to its limit, to * a string. The position and the mark are ignored. @@ -852,7 +853,7 @@ public class Bytes { offset = putInt(result, offset, val.scale()); return putBytes(result, offset, valueBytes, 0, valueBytes.length); } - + /** * @param vint Integer to make a vint of. * @return Vint as bytes array. @@ -960,7 +961,7 @@ public class Bytes { return LexicographicalComparerHolder.BEST_COMPARER. compareTo(buffer1, offset1, length1, buffer2, offset2, length2); } - + interface Comparer { abstract public int compareTo(T buffer1, int offset1, int length1, T buffer2, int offset2, int length2); @@ -982,7 +983,7 @@ public class Bytes { static class LexicographicalComparerHolder { static final String UNSAFE_COMPARER_NAME = LexicographicalComparerHolder.class.getName() + "$UnsafeComparer"; - + static final Comparer BEST_COMPARER = getBestComparer(); /** * Returns the Unsafe-using Comparer, or falls back to the pure-Java @@ -1001,7 +1002,7 @@ public class Bytes { return lexicographicalComparerJavaImpl(); } } - + enum PureJavaComparer implements Comparer { INSTANCE; @@ -1027,7 +1028,7 @@ public class Bytes { return length1 - length2; } } - + @VisibleForTesting enum UnsafeComparer implements Comparer { INSTANCE; @@ -1164,7 +1165,7 @@ public class Bytes { if (left == null || right == null) return false; if (left.length != right.length) return false; if (left.length == 0) return true; - + // Since we're often comparing adjacent sorted data, // it's usual to have equal arrays except for the very last byte // so check that first @@ -1172,7 +1173,7 @@ public class Bytes { return compareTo(left, right) == 0; } - + public static boolean equals(final byte[] left, int leftOffset, int leftLen, final byte[] right, int rightOffset, int rightLen) { // short circuit case @@ -1188,7 +1189,7 @@ public class Bytes { if (leftLen == 0) { return true; } - + // Since we're often comparing adjacent sorted data, // it's usual to have equal arrays except for the very last byte // so check that first @@ -1197,7 +1198,7 @@ public class Bytes { return LexicographicalComparerHolder.BEST_COMPARER. compareTo(left, leftOffset, leftLen, right, rightOffset, rightLen) == 0; } - + /** * Return true if the byte array on the right is a prefix of the byte @@ -1207,7 +1208,7 @@ public class Bytes { return bytes != null && prefix != null && bytes.length >= prefix.length && LexicographicalComparerHolder.BEST_COMPARER. - compareTo(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0; + compareTo(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0; } /** @@ -1379,7 +1380,7 @@ public class Bytes { */ public static Iterable iterateOnSplits( final byte[] a, final byte[]b, boolean inclusive, final int num) - { + { byte [] aPadded; byte [] bPadded; if (a.length < b.length) { @@ -1419,7 +1420,7 @@ public class Bytes { final Iterator iterator = new Iterator() { private int i = -1; - + @Override public boolean hasNext() { return i < num+1; @@ -1430,7 +1431,7 @@ public class Bytes { i++; if (i == 0) return a; if (i == num + 1) return b; - + BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i))); byte [] padded = curBI.toByteArray(); if (padded[1] == 0) @@ -1444,9 +1445,9 @@ public class Bytes { public void remove() { throw new UnsupportedOperationException(); } - + }; - + return new Iterable() { @Override public Iterator iterator() { @@ -1628,7 +1629,7 @@ public class Bytes { /** * Reads a fixed-size field and interprets it as a string padded with zeros. */ - public static String readStringFixedSize(final DataInput in, int size) + public static String readStringFixedSize(final DataInput in, int size) throws IOException { byte[] b = new byte[size]; in.readFully(b); diff --git a/src/main/java/org/apache/hadoop/hbase/util/HMerge.java b/src/main/java/org/apache/hadoop/hbase/util/HMerge.java index 440174c9b32..a685aa77c39 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HMerge.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HMerge.java @@ -145,7 +145,7 @@ class HMerge { throws IOException { this.conf = conf; this.fs = fs; - this.maxFilesize = conf.getLong("hbase.hregion.max.filesize", + this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); this.tabledir = new Path( @@ -435,4 +435,4 @@ class HMerge { } } } -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/Keying.java b/src/main/java/org/apache/hadoop/hbase/util/Keying.java index d3b83f4e51d..2e3d02761a1 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Keying.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Keying.java @@ -112,4 +112,5 @@ public class Keying { } return sb.toString(); } -} \ No newline at end of file + +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index 46fc7d0bf61..315e0cf7d53 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -79,7 +79,7 @@ import com.google.common.collect.Sets; *

    * Question: How do I turn off automatic splitting?
    * Answer: Automatic splitting is determined by the configuration value - * "hbase.hregion.max.filesize". It is not recommended that you set this + * HConstants.HREGION_MAX_FILESIZE. It is not recommended that you set this * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting * is 100GB, which would result in > 1hr major compactions if reached. *

    diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java index bf5de8df59b..6d955acfe18 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -96,7 +96,7 @@ public class ZKConfig { int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, - "localhost"); + HConstants.LOCALHOST); for (int i = 0; i < serverHosts.length; ++i) { String serverHost = serverHosts[i]; String address = serverHost + ":" + peerPort + ":" + leaderPort; @@ -160,7 +160,7 @@ public class ZKConfig { // Special case for 'hbase.cluster.distributed' property being 'true' if (key.startsWith("server.")) { if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED) - && value.startsWith("localhost")) { + && value.startsWith(HConstants.LOCALHOST)) { String msg = "The server in zoo.cfg cannot be set to localhost " + "in a fully-distributed setup because it won't be reachable. " + "See \"Getting Started\" for more information."; diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 132d1c2ef89..157bffaf2a6 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -33,15 +33,18 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.EmptyWatcher; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -91,7 +94,8 @@ public class ZKUtil { if(ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); } - int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000); + int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT); LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); int retry = conf.getInt("zookeeper.recovery.retry", 3); @@ -1116,4 +1120,46 @@ public class ZKUtil { RegionTransitionData.fromBytes(data).toString() : StringUtils.abbreviate(Bytes.toStringBinary(data), 32))))); } + + /** + * Waits for HBase installation's base (parent) znode to become available. + * @throws IOException on ZK errors + */ + public static void waitForBaseZNode(Configuration conf) throws IOException { + LOG.info("Waiting until the base znode is available"); + String parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf), + conf.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT), EmptyWatcher.instance); + + final int maxTimeMs = 10000; + final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; + + KeeperException keeperEx = null; + try { + try { + for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { + try { + if (zk.exists(parentZNode, false) != null) { + LOG.info("Parent znode exists: " + parentZNode); + keeperEx = null; + break; + } + } catch (KeeperException e) { + keeperEx = e; + } + Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); + } + } finally { + zk.close(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + if (keeperEx != null) { + throw new IOException(keeperEx); + } + } } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 66d808f8c79..68fb6505946 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -25,7 +25,10 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; +import java.net.InetAddress; import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +67,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Keying; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; @@ -73,7 +78,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -93,6 +97,13 @@ public class HBaseTestingUtility { private static final Log LOG = LogFactory.getLog(HBaseTestingUtility.class); private Configuration conf; private MiniZooKeeperCluster zkCluster = null; + + /** + * The default number of regions per regionserver when creating a pre-split + * table. + */ + private static int DEFAULT_REGIONS_PER_SERVER = 5; + /** * Set if we were passed a zkCluster. If so, we won't shutdown zk as * part of general shutdown. @@ -394,7 +405,7 @@ public class HBaseTestingUtility { public MiniZooKeeperCluster startMiniZKCluster() throws Exception { return startMiniZKCluster(1); } - + /** * Call this if you only want a zk cluster. * @param zooKeeperServerNum @@ -403,18 +414,18 @@ public class HBaseTestingUtility { * @see #shutdownMiniZKCluster() * @return zk cluster started. */ - public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) + public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) throws Exception { File zkClusterFile = new File(getClusterTestDir().toString()); return startMiniZKCluster(zkClusterFile, zooKeeperServerNum); } - + private MiniZooKeeperCluster startMiniZKCluster(final File dir) throws Exception { return startMiniZKCluster(dir,1); } - - private MiniZooKeeperCluster startMiniZKCluster(final File dir, + + private MiniZooKeeperCluster startMiniZKCluster(final File dir, int zooKeeperServerNum) throws Exception { if (this.zkCluster != null) { @@ -469,7 +480,7 @@ public class HBaseTestingUtility { return startMiniCluster(1, numSlaves); } - + /** * start minicluster * @throws Exception @@ -481,8 +492,8 @@ public class HBaseTestingUtility { throws Exception { return startMiniCluster(numMasters, numSlaves, null); } - - + + /** * Start up a minicluster of hbase, optionally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random @@ -514,7 +525,7 @@ public class HBaseTestingUtility { if ( dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } - + LOG.info("Starting up minicluster with " + numMasters + " master(s) and " + numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)"); @@ -1557,7 +1568,7 @@ public class HBaseTestingUtility { return getFromStoreFile(store,get); } - + /** * Gets a ZooKeeperWatcher. * @param TEST_UTIL @@ -1582,7 +1593,7 @@ public class HBaseTestingUtility { }); return zkw; } - + /** * Creates a znode with OPENED state. * @param TEST_UTIL @@ -1606,7 +1617,7 @@ public class HBaseTestingUtility { version); return zkw; } - + public static void assertKVListsEqual(String additionalMsg, final List expected, final List actual) { @@ -1644,7 +1655,7 @@ public class HBaseTestingUtility { public String getClusterKey() { return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" - + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":" + + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":" + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); } @@ -1739,7 +1750,7 @@ public class HBaseTestingUtility { return MIN_RANDOM_PORT + new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT); } - + public static int randomFreePort() { int port = 0; do { @@ -1754,4 +1765,77 @@ public class HBaseTestingUtility { return port; } + public static void waitForHostPort(String host, int port) + throws IOException { + final int maxTimeMs = 10000; + final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; + IOException savedException = null; + LOG.info("Waiting for server at " + host + ":" + port); + for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { + try { + Socket sock = new Socket(InetAddress.getByName(host), port); + sock.close(); + savedException = null; + LOG.info("Server at " + host + ":" + port + " is available"); + break; + } catch (UnknownHostException e) { + throw new IOException("Failed to look up " + host, e); + } catch (IOException e) { + savedException = e; + } + Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); + } + + if (savedException != null) { + throw savedException; + } + } + + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + byte[] tableName, byte[] columnFamily) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(columnFamily)); + + int totalNumberOfRegions = 0; + try { + HBaseAdmin admin = new HBaseAdmin(conf); + + // create a table a pre-splits regions. + // The number of splits is set as: + // region servers * regions per region server). + int numberOfServers = admin.getClusterStatus().getServers().size(); + if (numberOfServers == 0) { + throw new IllegalStateException("No live regionservers"); + } + + totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER; + LOG.info("Number of live regionservers: " + numberOfServers + ", " + + "pre-splitting table into " + totalNumberOfRegions + " regions " + + "(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")"); + + byte[][] splits = new RegionSplitter.HexStringSplit().split( + totalNumberOfRegions); + + admin.createTable(desc, splits); + } catch (MasterNotRunningException e) { + LOG.error("Master not running", e); + throw new IOException(e); + } catch (TableExistsException e) { + LOG.warn("Table " + Bytes.toStringBinary(tableName) + + " already exists, continuing"); + } + return totalNumberOfRegions; + } + + public static int getMetaRSPort(Configuration conf) throws IOException { + HTable table = new HTable(conf, HConstants.META_TABLE_NAME); + HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes("")); + return hloc.getPort(); + } + } diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java b/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java index c0b09224b48..62d08bdc71e 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java @@ -96,7 +96,7 @@ public class TestAdmin { @Test public void testDeleteEditUnknownColumnFamilyAndOrTable() throws IOException { - // Test we get exception if we try to + // Test we get exception if we try to final String nonexistent = "nonexistent"; HColumnDescriptor nonexistentHcd = new HColumnDescriptor(nonexistent); Exception exception = null; @@ -312,7 +312,7 @@ public class TestAdmin { * @throws IOException * @throws InterruptedException */ - @Test + @Test public void testOnlineChangeTableSchema() throws IOException, InterruptedException { final byte [] tableName = Bytes.toBytes("changeTableSchemaOnline"); TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean( @@ -398,7 +398,7 @@ public class TestAdmin { this.admin.listTables(); assertFalse(this.admin.tableExists(tableName)); } - + @Test public void testShouldFailOnlineSchemaUpdateIfOnlineSchemaIsNotEnabled() throws Exception { @@ -699,7 +699,7 @@ public class TestAdmin { /** * Test round-robin assignment on enableTable. - * + * * @throws IOException */ @Test @@ -751,7 +751,7 @@ public class TestAdmin { assertTrue(entryList.size() == 3); assertTrue((entryList.get(2).getValue() - entryList.get(0).getValue()) < 2); } - + /** * Multi-family scenario. Tests forcing split from client and * having scanners successfully ride over split. @@ -1223,7 +1223,7 @@ public class TestAdmin { } catch (IllegalArgumentException e) { } } - + @Test public void testCloseRegionWhenServerNameIsEmpty() throws Exception { @@ -1312,12 +1312,12 @@ public class TestAdmin { this.admin.deleteTable(tableName); } } - + /** * For HBASE-2556 * @throws IOException - */ + */ @Test public void testGetTableRegions() throws IOException { @@ -1336,11 +1336,11 @@ public class TestAdmin { admin.createTable(desc, startKey, endKey, expectedRegions); List RegionInfos = admin.getTableRegions(tableName); - + assertEquals("Tried to create " + expectedRegions + " regions " + "but only found " + RegionInfos.size(), expectedRegions, RegionInfos.size()); - + } @Test @@ -1372,7 +1372,7 @@ public class TestAdmin { private void setUpforLogRolling() { // Force a region split after every 768KB - TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", + TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); // We roll the log after every 32 writes @@ -1415,7 +1415,7 @@ public class TestAdmin { TEST_UTIL.getConfiguration().setInt( "hbase.regionserver.hlog.lowreplication.rolllimit", 3); } - + private HRegionServer startAndWriteData(String tableName, byte[] value) throws IOException { // When the META table can be opened, the region servers are running @@ -1446,7 +1446,7 @@ public class TestAdmin { } return regionServer; } - + /** * HBASE-4417 checkHBaseAvailable() doesn't close zk connections */ @@ -1457,5 +1457,5 @@ public class TestAdmin { HBaseAdmin.checkHBaseAvailable(conf); } } - + } diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index e45ad85a65d..3e14626a4b3 100644 --- a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -54,11 +54,11 @@ public class TestCoprocessorInterface extends HBaseTestCase { private static class CustomScanner implements RegionScanner { private RegionScanner delegate; - + public CustomScanner(RegionScanner delegate) { this.delegate = delegate; } - + @Override public boolean next(List results) throws IOException { return delegate.next(results); @@ -83,9 +83,9 @@ public class TestCoprocessorInterface extends HBaseTestCase { public boolean isFilterDone() { return delegate.isFilterDone(); } - + } - + public static class CoprocessorImpl extends BaseRegionObserver { private boolean startCalled; @@ -195,11 +195,11 @@ public class TestCoprocessorInterface extends HBaseTestCase { addContent(region, fam3); region.flushcache(); } - + region.compactStores(); byte [] splitRow = region.checkSplit(); - + assertNotNull(splitRow); HRegion [] regions = split(region, splitRow); for (int i = 0; i < regions.length; i++) { @@ -216,7 +216,7 @@ public class TestCoprocessorInterface extends HBaseTestCase { assertTrue(scanner instanceof CustomScanner); // this would throw an exception before HBASE-4197 scanner.next(new ArrayList()); - + assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted()); assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped()); assertTrue(((CoprocessorImpl)c).wasOpened()); @@ -303,7 +303,7 @@ public class TestCoprocessorInterface extends HBaseTestCase { TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000); // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M - TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", + TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true); diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 595e383b9cf..9be934f2cb9 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -90,18 +90,18 @@ public class TestHFileOutputFormat { = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); - + private HBaseTestingUtility util = new HBaseTestingUtility(); - + private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class); - + /** * Simple mapper that makes KeyValue output. */ static class RandomKVGeneratingMapper extends Mapper { - + private int keyLength; private static final int KEYLEN_DEFAULT=10; private static final String KEYLEN_CONF="randomkv.key.length"; @@ -109,12 +109,12 @@ public class TestHFileOutputFormat { private int valLength; private static final int VALLEN_DEFAULT=10; private static final String VALLEN_CONF="randomkv.val.length"; - + @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - + Configuration conf = context.getConfiguration(); keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); @@ -129,7 +129,7 @@ public class TestHFileOutputFormat { byte keyBytes[] = new byte[keyLength]; byte valBytes[] = new byte[valLength]; - + int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; @@ -155,8 +155,8 @@ public class TestHFileOutputFormat { public void cleanupDir() throws IOException { util.cleanupTestDir(); } - - + + private void setupRandomGeneratorMapper(Job job) { job.setInputFormatClass(NMapInputFormat.class); job.setMapperClass(RandomKVGeneratingMapper.class); @@ -310,22 +310,22 @@ public class TestHFileOutputFormat { Configuration conf = util.getConfiguration(); Path testDir = util.getDataTestDir("testWritingPEData"); FileSystem fs = testDir.getFileSystem(conf); - + // Set down this value or we OOME in eclipse. conf.setInt("io.sort.mb", 20); // Write a few files. - conf.setLong("hbase.hregion.max.filesize", 64 * 1024); - + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + Job job = new Job(conf, "testWritingPEData"); setupRandomGeneratorMapper(job); // This partitioner doesn't work well for number keys but using it anyways // just to demonstrate how to configure it. byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; - + Arrays.fill(startKey, (byte)0); Arrays.fill(endKey, (byte)0xff); - + job.setPartitionerClass(SimpleTotalOrderPartitioner.class); // Set start and end rows for partitioner. SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); @@ -333,13 +333,13 @@ public class TestHFileOutputFormat { job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class); job.setNumReduceTasks(4); - + FileOutputFormat.setOutputPath(job, testDir); assertTrue(job.waitForCompletion(false)); FileStatus [] files = fs.listStatus(testDir); assertTrue(files.length > 0); } - + @Test public void testJobConfiguration() throws Exception { Job job = new Job(); @@ -369,13 +369,13 @@ public class TestHFileOutputFormat { public void testMRIncrementalLoadWithSplit() throws Exception { doIncrementalLoadTest(true); } - + private void doIncrementalLoadTest( boolean shouldChangeRegions) throws Exception { Configuration conf = util.getConfiguration(); Path testDir = util.getDataTestDir("testLocalMRIncrementalLoad"); byte[][] startKeys = generateRandomStartKeys(5); - + try { util.startMiniCluster(); HBaseAdmin admin = new HBaseAdmin(conf); @@ -392,7 +392,7 @@ public class TestHFileOutputFormat { // This doesn't write into the table, just makes files assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); - + // Make sure that a directory was created for every CF int dir = 0; @@ -424,10 +424,10 @@ public class TestHFileOutputFormat { LOG.info("Waiting for new region assignment to happen"); } } - + // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); - + // Ensure data shows up int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; assertEquals("LoadIncrementalHFiles should put expected data in table", @@ -446,12 +446,12 @@ public class TestHFileOutputFormat { } results.close(); String tableDigestBefore = util.checksumRows(table); - + // Cause regions to reopen admin.disableTable(TABLE_NAME); while (!admin.isTableDisabled(TABLE_NAME)) { Thread.sleep(200); - LOG.info("Waiting for table to disable"); + LOG.info("Waiting for table to disable"); } admin.enableTable(TABLE_NAME); util.waitTableAvailable(TABLE_NAME, 30000); @@ -475,15 +475,15 @@ public class TestHFileOutputFormat { assertEquals(table.getRegionsInfo().size(), job.getNumReduceTasks()); - + assertTrue(job.waitForCompletion(true)); } - + /** * Test for * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests * that the compression map is correctly deserialized from configuration - * + * * @throws IOException */ @Test @@ -549,7 +549,7 @@ public class TestHFileOutputFormat { } return familyToCompression; } - + /** * Test that {@link HFileOutputFormat} RecordWriter uses compression settings * from the column family descriptor @@ -597,7 +597,7 @@ public class TestHFileOutputFormat { // Make sure that a directory was created for every CF FileSystem fileSystem = dir.getFileSystem(conf); - + // commit so that the filesystem has one directory per column family hof.getOutputCommitter(context).commitTask(context); for (byte[] family : FAMILIES) { @@ -682,7 +682,7 @@ public class TestHFileOutputFormat { } } } - + @Test public void testExcludeMinorCompaction() throws Exception { Configuration conf = util.getConfiguration(); @@ -775,9 +775,9 @@ public class TestHFileOutputFormat { public static void main(String args[]) throws Exception { new TestHFileOutputFormat().manualTest(args); } - + public void manualTest(String args[]) throws Exception { - Configuration conf = HBaseConfiguration.create(); + Configuration conf = HBaseConfiguration.create(); util = new HBaseTestingUtility(conf); if ("newtable".equals(args[0])) { byte[] tname = args[1].getBytes(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 0f8e380d514..00bb18c9b99 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3278,7 +3278,7 @@ public class TestHRegion extends HBaseTestCase { // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M - conf.setLong("hbase.hregion.max.filesize", 1024 * 128); + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); return conf; } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java index 9769e014a34..c86fb27ed55 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java @@ -48,36 +48,36 @@ public class TestRegionSplitPolicy { mockRegion = Mockito.mock(HRegion.class); Mockito.doReturn(htd).when(mockRegion).getTableDesc(); Mockito.doReturn(hri).when(mockRegion).getRegionInfo(); - + stores = new TreeMap(Bytes.BYTES_COMPARATOR); Mockito.doReturn(stores).when(mockRegion).getStores(); } - + @Test public void testCreateDefault() throws IOException { - conf.setLong("hbase.hregion.max.filesize", 1234L); - + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1234L); + // Using a default HTD, should pick up the file size from // configuration. ConstantSizeRegionSplitPolicy policy = (ConstantSizeRegionSplitPolicy)RegionSplitPolicy.create( mockRegion, conf); assertEquals(1234L, policy.getDesiredMaxFileSize()); - + // If specified in HTD, should use that htd.setMaxFileSize(9999L); policy = (ConstantSizeRegionSplitPolicy)RegionSplitPolicy.create( mockRegion, conf); - assertEquals(9999L, policy.getDesiredMaxFileSize()); + assertEquals(9999L, policy.getDesiredMaxFileSize()); } - + @Test public void testConstantSizePolicy() throws IOException { htd.setMaxFileSize(1024L); - + ConstantSizeRegionSplitPolicy policy = (ConstantSizeRegionSplitPolicy)RegionSplitPolicy.create(mockRegion, conf); - + // For no stores, should not split assertFalse(policy.shouldSplit()); @@ -86,9 +86,9 @@ public class TestRegionSplitPolicy { Mockito.doReturn(2000L).when(mockStore).getSize(); Mockito.doReturn(true).when(mockStore).canSplit(); stores.put(new byte[]{1}, mockStore); - + assertTrue(policy.shouldSplit()); - + // Act as if there's a reference file or some other reason it can't split. // This should prevent splitting even though it's big enough. Mockito.doReturn(false).when(mockStore).canSplit(); @@ -96,26 +96,26 @@ public class TestRegionSplitPolicy { // Reset splittability after above Mockito.doReturn(true).when(mockStore).canSplit(); - + // Set to a small size but turn on forceSplit. Should result in a split. Mockito.doReturn(true).when(mockRegion).shouldForceSplit(); Mockito.doReturn(100L).when(mockStore).getSize(); assertTrue(policy.shouldSplit()); - + // Turn off forceSplit, should not split Mockito.doReturn(false).when(mockRegion).shouldForceSplit(); assertFalse(policy.shouldSplit()); } - + @Test public void testGetSplitPoint() throws IOException { ConstantSizeRegionSplitPolicy policy = (ConstantSizeRegionSplitPolicy)RegionSplitPolicy.create(mockRegion, conf); - + // For no stores, should not split assertFalse(policy.shouldSplit()); assertNull(policy.getSplitPoint()); - + // Add a store above the requisite size. Should split. Store mockStore = Mockito.mock(Store.class); Mockito.doReturn(2000L).when(mockStore).getSize(); @@ -126,7 +126,7 @@ public class TestRegionSplitPolicy { assertEquals("store 1 split", Bytes.toString(policy.getSplitPoint())); - + // Add a bigger store. The split point should come from that one Store mockStore2 = Mockito.mock(Store.class); Mockito.doReturn(4000L).when(mockStore2).getSize(); @@ -134,7 +134,7 @@ public class TestRegionSplitPolicy { Mockito.doReturn(Bytes.toBytes("store 2 split")) .when(mockStore2).getSplitPoint(); stores.put(new byte[]{2}, mockStore2); - + assertEquals("store 2 split", Bytes.toString(policy.getSplitPoint())); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 9779cac5df0..dfabcd0b05f 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -115,7 +115,7 @@ public class TestLogRolling { public static void setUpBeforeClass() throws Exception { /**** configuration for testLogRolling ****/ // Force a region split after every 768KB - TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 768L * 1024L); + TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); // We roll the log after every 32 writes TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); @@ -281,7 +281,7 @@ public class TestLogRolling { } } } - + /** * Give me the HDFS pipeline for this log file */ @@ -308,9 +308,9 @@ public class TestLogRolling { * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200) * @throws IOException * @throws InterruptedException - * @throws InvocationTargetException + * @throws InvocationTargetException * @throws IllegalAccessException - * @throws IllegalArgumentException + * @throws IllegalArgumentException */ @Test public void testLogRollOnDatanodeDeath() throws IOException, diff --git a/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java b/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java new file mode 100644 index 00000000000..ba38f6ab1ba --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Random; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MD5Hash; + +/** + * A generator of random keys and values for load testing. Keys are generated + * by converting numeric indexes to strings and prefixing them with an MD5 + * hash. Values are generated by selecting value size in the configured range + * and generating a pseudo-random sequence of bytes seeded by key, column + * qualifier, and value size. + *

    + * Not thread-safe, so a separate instance is needed for every writer thread/ + */ +public class LoadTestKVGenerator { + + /** A random number generator for determining value size */ + private Random randomForValueSize = new Random(); + + private final int minValueSize; + private final int maxValueSize; + + public LoadTestKVGenerator(int minValueSize, int maxValueSize) { + if (minValueSize <= 0 || maxValueSize <= 0) { + throw new IllegalArgumentException("Invalid min/max value sizes: " + + minValueSize + ", " + maxValueSize); + } + this.minValueSize = minValueSize; + this.maxValueSize = maxValueSize; + } + + /** + * Verifies that the given byte array is the same as what would be generated + * for the given row key and qualifier. We are assuming that the value size + * is correct, and only verify the actual bytes. However, if the min/max + * value sizes are set sufficiently high, an accidental match should be + * extremely improbable. + */ + public static boolean verify(String rowKey, String qual, byte[] value) { + byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length); + return Bytes.equals(expectedData, value); + } + + /** + * Converts the given key to string, and prefixes it with the MD5 hash of + * the index's string representation. + */ + public static String md5PrefixedKey(long key) { + String stringKey = Long.toString(key); + String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey)); + + // flip the key to randomize + return md5hash + ":" + stringKey; + } + + /** + * Generates a value for the given key index and column qualifier. Size is + * selected randomly in the configured range. The generated value depends + * only on the combination of the key, qualifier, and the selected value + * size. This allows to verify the actual value bytes when reading, as done + * in {@link #verify(String, String, byte[])}. + */ + public byte[] generateRandomSizeValue(long key, String qual) { + String rowKey = md5PrefixedKey(key); + int dataSize = minValueSize + randomForValueSize.nextInt( + Math.abs(maxValueSize - minValueSize)); + return getValueForRowColumn(rowKey, qual, dataSize); + } + + /** + * Generates random bytes of the given size for the given row and column + * qualifier. The random seed is fully determined by these parameters. + */ + private static byte[] getValueForRowColumn(String rowKey, String qual, + int dataSize) { + Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() + + dataSize); + byte[] randomBytes = new byte[dataSize]; + seededRandom.nextBytes(randomBytes); + return randomBytes; + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java new file mode 100644 index 00000000000..2e175da70bd --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; + +/** + * A command-line utility that reads, writes, and verifies data. Unlike + * {@link PerformanceEvaluation}, this tool validates the data written, + * and supports simultaneously writing and reading the same set of keys. + */ +public class LoadTestTool extends AbstractHBaseTool { + + private static final Log LOG = LogFactory.getLog(LoadTestTool.class); + + /** Table name for the test */ + private byte[] tableName; + + /** Table name to use of not overridden on the command line */ + private static final String DEFAULT_TABLE_NAME = "cluster_test"; + + /** Column family used by the test */ + static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf"); + + /** Column families used by the test */ + static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY }; + + /** The number of reader/writer threads if not specified */ + private static final int DEFAULT_NUM_THREADS = 20; + + /** Usage string for the load option */ + private static final String OPT_USAGE_LOAD = + ":" + + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + + /** Usa\ge string for the read option */ + private static final String OPT_USAGE_READ = + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + + private static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " + + Arrays.toString(StoreFile.BloomType.values()); + + private static final String OPT_USAGE_COMPRESSION = "Compression type, " + + "one of " + Arrays.toString(Compression.Algorithm.values()); + + private static final String OPT_BLOOM = "bloom"; + private static final String OPT_COMPRESSION = "compression"; + private static final String OPT_KEY_WINDOW = "key_window"; + private static final String OPT_WRITE = "write"; + private static final String OPT_MAX_READ_ERRORS = "max_read_errors"; + private static final String OPT_MULTIPUT = "multiput"; + private static final String OPT_NUM_KEYS = "num_keys"; + private static final String OPT_READ = "read"; + private static final String OPT_START_KEY = "start_key"; + private static final String OPT_TABLE_NAME = "tn"; + private static final String OPT_ZK_QUORUM = "zk"; + + /** This will be removed as we factor out the dependency on command line */ + private CommandLine cmd; + + private MultiThreadedWriter writerThreads = null; + private MultiThreadedReader readerThreads = null; + + private long startKey, endKey; + + private boolean isWrite, isRead; + + // Writer options + private int numWriterThreads = DEFAULT_NUM_THREADS; + private long minColsPerKey, maxColsPerKey; + private int minColDataSize, maxColDataSize; + private boolean isMultiPut; + + // Reader options + private int numReaderThreads = DEFAULT_NUM_THREADS; + private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; + private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; + private int verifyPercent; + + /** Create tables if needed. */ + public void createTables() throws IOException { + HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, + COLUMN_FAMILY); + applyBloomFilterAndCompression(tableName, COLUMN_FAMILIES); + } + + private String[] splitColonSeparated(String option, + int minNumCols, int maxNumCols) { + String optVal = cmd.getOptionValue(option); + String[] cols = optVal.split(":"); + if (cols.length < minNumCols || cols.length > maxNumCols) { + throw new IllegalArgumentException("Expected at least " + + minNumCols + " columns but no more than " + maxNumCols + + " in the colon-separated value '" + optVal + "' of the " + + "-" + option + " option"); + } + return cols; + } + + private int getNumThreads(String numThreadsStr) { + return parseInt(numThreadsStr, 1, Short.MAX_VALUE); + } + + /** + * Apply the given Bloom filter type to all column families we care about. + */ + private void applyBloomFilterAndCompression(byte[] tableName, + byte[][] columnFamilies) throws IOException { + String bloomStr = cmd.getOptionValue(OPT_BLOOM); + StoreFile.BloomType bloomType = bloomStr == null ? null : + StoreFile.BloomType.valueOf(bloomStr); + + String compressStr = cmd.getOptionValue(OPT_COMPRESSION); + Compression.Algorithm compressAlgo = compressStr == null ? null : + Compression.Algorithm.valueOf(compressStr); + + if (bloomStr == null && compressStr == null) + return; + + HBaseAdmin admin = new HBaseAdmin(conf); + HTableDescriptor tableDesc = admin.getTableDescriptor(tableName); + LOG.info("Disabling table " + Bytes.toString(tableName)); + admin.disableTable(tableName); + for (byte[] cf : columnFamilies) { + HColumnDescriptor columnDesc = tableDesc.getFamily(cf); + if (bloomStr != null) + columnDesc.setBloomFilterType(bloomType); + if (compressStr != null) + columnDesc.setCompressionType(compressAlgo); + admin.modifyColumn(tableName, columnDesc); + } + LOG.info("Enabling table " + Bytes.toString(tableName)); + admin.enableTable(tableName); + } + + @Override + protected void addOptions() { + addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " + + "without port numbers"); + addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); + addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); + addOptWithArg(OPT_READ, OPT_USAGE_READ); + addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); + addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); + addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + + "to tolerate before terminating all reader threads. The default is " + + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); + addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + + "reads and writes for concurrent write/read workload. The default " + + "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); + addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " + + "separate puts for every column in a row"); + + addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); + addRequiredOptWithArg(OPT_START_KEY, "The first key to read/write"); + } + + @Override + protected void processOptions(CommandLine cmd) { + this.cmd = cmd; + + tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME, + DEFAULT_TABLE_NAME)); + startKey = parseLong(cmd.getOptionValue(OPT_START_KEY), 0, + Long.MAX_VALUE); + long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, + Long.MAX_VALUE - startKey); + endKey = startKey + numKeys; + + isWrite = cmd.hasOption(OPT_WRITE); + isRead = cmd.hasOption(OPT_READ); + + if (!isWrite && !isRead) { + throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " + + "-" + OPT_READ + " has to be specified"); + } + + if (isWrite) { + String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); + + int colIndex = 0; + minColsPerKey = 1; + maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]); + int avgColDataSize = + parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); + minColDataSize = avgColDataSize / 2; + maxColDataSize = avgColDataSize * 3 / 2; + + if (colIndex < writeOpts.length) { + numWriterThreads = getNumThreads(writeOpts[colIndex++]); + } + + isMultiPut = cmd.hasOption(OPT_MULTIPUT); + + System.out.println("Multi-puts: " + isMultiPut); + System.out.println("Columns per key: " + minColsPerKey + ".." + + maxColsPerKey); + System.out.println("Data size per column: " + minColDataSize + ".." + + maxColDataSize); + } + + if (isRead) { + String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); + int colIndex = 0; + verifyPercent = parseInt(readOpts[colIndex++], 0, 100); + if (colIndex < readOpts.length) { + numReaderThreads = getNumThreads(readOpts[colIndex++]); + } + + if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { + maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), + 0, Integer.MAX_VALUE); + } + + if (cmd.hasOption(OPT_KEY_WINDOW)) { + keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), + 0, Integer.MAX_VALUE); + } + + System.out.println("Percent of keys to verify: " + verifyPercent); + System.out.println("Reader threads: " + numReaderThreads); + } + + System.out.println("Key range: " + startKey + ".." + (endKey - 1)); + } + + @Override + protected void doWork() throws IOException { + if (cmd.hasOption(OPT_ZK_QUORUM)) { + conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); + } + + createTables(); + + if (isWrite) { + writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY); + writerThreads.setMultiPut(isMultiPut); + writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey); + writerThreads.setDataSize(minColDataSize, maxColDataSize); + } + + if (isRead) { + readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY, + verifyPercent); + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + } + + if (isRead && isWrite) { + LOG.info("Concurrent read/write workload: making readers aware of the " + + "write point"); + readerThreads.linkToWriter(writerThreads); + } + + if (isWrite) { + System.out.println("Starting to write data..."); + writerThreads.start(startKey, endKey, numWriterThreads); + } + + if (isRead) { + System.out.println("Starting to read data..."); + readerThreads.start(startKey, endKey, numReaderThreads); + } + + if (isWrite) { + writerThreads.waitForFinish(); + } + + if (isRead) { + readerThreads.waitForFinish(); + } + } + + public static void main(String[] args) { + new LoadTestTool().doStaticMain(args); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java new file mode 100644 index 00000000000..b312cca1d84 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; + +/** + * Common base class for reader and writer parts of multi-thread HBase load + * test ({@link LoadTestTool}). + */ +public abstract class MultiThreadedAction { + private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class); + + protected final byte[] tableName; + protected final byte[] columnFamily; + protected final Configuration conf; + + protected int numThreads = 1; + + /** The start key of the key range, inclusive */ + protected long startKey = 0; + + /** The end key of the key range, exclusive */ + protected long endKey = 1; + + protected AtomicInteger numThreadsWorking = new AtomicInteger(); + protected AtomicLong numKeys = new AtomicLong(); + protected AtomicLong numCols = new AtomicLong(); + protected AtomicLong totalOpTimeMs = new AtomicLong(); + protected boolean verbose = false; + + protected int minDataSize = 256; + protected int maxDataSize = 1024; + + /** "R" or "W" */ + private String actionLetter; + + /** Whether we need to print out Hadoop Streaming-style counters */ + private boolean streamingCounters; + + public static final int REPORTING_INTERVAL_MS = 5000; + + public MultiThreadedAction(Configuration conf, byte[] tableName, + byte[] columnFamily, String actionLetter) { + this.conf = conf; + this.tableName = tableName; + this.columnFamily = columnFamily; + this.actionLetter = actionLetter; + } + + public void start(long startKey, long endKey, int numThreads) + throws IOException { + this.startKey = startKey; + this.endKey = endKey; + this.numThreads = numThreads; + (new Thread(new ProgressReporter(actionLetter))).start(); + } + + private static String formatTime(long elapsedTime) { + String format = String.format("%%0%dd", 2); + elapsedTime = elapsedTime / 1000; + String seconds = String.format(format, elapsedTime % 60); + String minutes = String.format(format, (elapsedTime % 3600) / 60); + String hours = String.format(format, elapsedTime / 3600); + String time = hours + ":" + minutes + ":" + seconds; + return time; + } + + /** Asynchronously reports progress */ + private class ProgressReporter implements Runnable { + + private String reporterId = ""; + + public ProgressReporter(String id) { + this.reporterId = id; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + long priorNumKeys = 0; + long priorCumulativeOpTime = 0; + int priorAverageKeysPerSecond = 0; + + // Give other threads time to start. + Threads.sleep(REPORTING_INTERVAL_MS); + + while (numThreadsWorking.get() != 0) { + String threadsLeft = + "[" + reporterId + ":" + numThreadsWorking.get() + "] "; + if (numKeys.get() == 0) { + LOG.info(threadsLeft + "Number of keys = 0"); + } else { + long numKeys = MultiThreadedAction.this.numKeys.get(); + long time = System.currentTimeMillis() - startTime; + long totalOpTime = totalOpTimeMs.get(); + + long numKeysDelta = numKeys - priorNumKeys; + long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; + + double averageKeysPerSecond = + (time > 0) ? (numKeys * 1000 / time) : 0; + + LOG.info(threadsLeft + + "Keys=" + + numKeys + + ", cols=" + + StringUtils.humanReadableInt(numCols.get()) + + ", time=" + + formatTime(time) + + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " + + numKeys * 1000 / time + ", latency=" + totalOpTime + / numKeys + " ms]") : "") + + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" + + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" + + totalOpTimeDelta / numKeysDelta + " ms]") : "") + + progressInfo()); + + if (streamingCounters) { + printStreamingCounters(numKeysDelta, + averageKeysPerSecond - priorAverageKeysPerSecond); + } + + priorNumKeys = numKeys; + priorCumulativeOpTime = totalOpTime; + priorAverageKeysPerSecond = (int) averageKeysPerSecond; + } + + Threads.sleep(REPORTING_INTERVAL_MS); + } + } + + private void printStreamingCounters(long numKeysDelta, + double avgKeysPerSecondDelta) { + // Write stats in a format that can be interpreted as counters by + // streaming map-reduce jobs. + System.err.println("reporter:counter:numKeys," + reporterId + "," + + numKeysDelta); + System.err.println("reporter:counter:numCols," + reporterId + "," + + numCols.get()); + System.err.println("reporter:counter:avgKeysPerSecond," + reporterId + + "," + (long) (avgKeysPerSecondDelta)); + } + } + + public void setDataSize(int minDataSize, int maxDataSize) { + this.minDataSize = minDataSize; + this.maxDataSize = maxDataSize; + } + + public void waitForFinish() { + while (numThreadsWorking.get() != 0) { + Threads.sleepWithoutInterrupt(1000); + } + } + + protected void startThreads(Collection threads) { + numThreadsWorking.addAndGet(threads.size()); + for (Thread thread : threads) { + thread.start(); + } + } + + /** @return the end key of the key range, exclusive */ + public long getEndKey() { + return endKey; + } + + /** Returns a task-specific progress string */ + protected abstract String progressInfo(); + + protected static void appendToStatus(StringBuilder sb, String desc, + long v) { + if (v == 0) { + return; + } + sb.append(", "); + sb.append(desc); + sb.append("="); + sb.append(v); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java new file mode 100644 index 00000000000..4fd14519296 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; + +/** Creates multiple threads that read and verify previously written data */ +public class MultiThreadedReader extends MultiThreadedAction +{ + private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); + + private Set readers = new HashSet(); + private final double verifyPercent; + private volatile boolean aborted; + + private MultiThreadedWriter writer = null; + + /** + * The number of keys verified in a sequence. This will never be larger than + * the total number of keys in the range. The reader might also verify + * random keys when it catches up with the writer. + */ + private final AtomicLong numUniqueKeysVerified = new AtomicLong(); + + /** + * Default maximum number of read errors to tolerate before shutting down all + * readers. + */ + public static final int DEFAULT_MAX_ERRORS = 10; + + /** + * Default "window" size between the last key written by the writer and the + * key that we attempt to read. The lower this number, the stricter our + * testing is. If this is zero, we always attempt to read the highest key + * in the contiguous sequence of keys written by the writers. + */ + public static final int DEFAULT_KEY_WINDOW = 0; + + protected AtomicLong numKeysVerified = new AtomicLong(0); + private AtomicLong numReadErrors = new AtomicLong(0); + private AtomicLong numReadFailures = new AtomicLong(0); + + private int maxErrors = DEFAULT_MAX_ERRORS; + private int keyWindow = DEFAULT_KEY_WINDOW; + + public MultiThreadedReader(Configuration conf, byte[] tableName, + byte[] columnFamily, double verifyPercent) { + super(conf, tableName, columnFamily, "R"); + this.verifyPercent = verifyPercent; + } + + public void linkToWriter(MultiThreadedWriter writer) { + this.writer = writer; + writer.setTrackInsertedKeys(true); + } + + public void setMaxErrors(int maxErrors) { + this.maxErrors = maxErrors; + } + + public void setKeyWindow(int keyWindow) { + this.keyWindow = keyWindow; + } + + @Override + public void start(long startKey, long endKey, int numThreads) + throws IOException { + super.start(startKey, endKey, numThreads); + if (verbose) { + LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); + } + + for (int i = 0; i < numThreads; ++i) { + HBaseReaderThread reader = new HBaseReaderThread(i); + readers.add(reader); + } + startThreads(readers); + } + + public class HBaseReaderThread extends Thread { + private final int readerId; + private final HTable table; + private final Random random = new Random(); + + /** The "current" key being read. Increases from startKey to endKey. */ + private long curKey; + + /** Time when the thread started */ + private long startTimeMs; + + /** If we are ahead of the writer and reading a random key. */ + private boolean readingRandomKey; + + /** + * @param readerId only the keys with this remainder from division by + * {@link #numThreads} will be read by this thread + */ + public HBaseReaderThread(int readerId) throws IOException { + this.readerId = readerId; + table = new HTable(conf, tableName); + setName(getClass().getSimpleName() + "_" + readerId); + } + + @Override + public void run() { + try { + runReader(); + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + numThreadsWorking.decrementAndGet(); + } + } + + private void runReader() { + if (verbose) { + LOG.info("Started thread #" + readerId + " for reads..."); + } + + startTimeMs = System.currentTimeMillis(); + curKey = startKey; + while (curKey < endKey && !aborted) { + long k = getNextKeyToRead(); + + // A sanity check for the key range. + if (k < startKey || k >= endKey) { + numReadErrors.incrementAndGet(); + throw new AssertionError("Load tester logic error: proposed key " + + "to read " + k + " is out of range (startKey=" + startKey + + ", endKey=" + endKey + ")"); + } + + if (k % numThreads != readerId || + writer != null && writer.failedToWriteKey(k)) { + // Skip keys that this thread should not read, as well as the keys + // that we know the writer failed to write. + continue; + } + + readKey(k); + if (k == curKey - 1 && !readingRandomKey) { + // We have verified another unique key. + numUniqueKeysVerified.incrementAndGet(); + } + } + } + + /** + * Should only be used for the concurrent writer/reader workload. The + * maximum key we are allowed to read, subject to the "key window" + * constraint. + */ + private long maxKeyWeCanRead() { + long insertedUpToKey = writer.insertedUpToKey(); + if (insertedUpToKey >= endKey - 1) { + // The writer has finished writing our range, so we can read any + // key in the range. + return endKey - 1; + } + return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow); + } + + private long getNextKeyToRead() { + readingRandomKey = false; + if (writer == null || curKey <= maxKeyWeCanRead()) { + return curKey++; + } + + // We caught up with the writer. See if we can read any keys at all. + long maxKeyToRead; + while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { + // The writer has not written sufficient keys for us to be able to read + // anything at all. Sleep a bit. This should only happen in the + // beginning of a load test run. + Threads.sleepWithoutInterrupt(50); + } + + if (curKey <= maxKeyToRead) { + // The writer wrote some keys, and we are now allowed to read our + // current key. + return curKey++; + } + + // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. + // Don't increment the current key -- we still have to try reading it + // later. Set a flag to make sure that we don't count this key towards + // the set of unique keys we have verified. + readingRandomKey = true; + return startKey + Math.abs(random.nextLong()) + % (maxKeyToRead - startKey + 1); + } + + private Get readKey(long keyToRead) { + Get get = new Get( + LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes()); + get.addFamily(columnFamily); + + try { + if (verbose) { + LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + + ", cf " + Bytes.toStringBinary(columnFamily)); + } + queryKey(get, random.nextInt(100) < verifyPercent); + } catch (IOException e) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + return get; + } + + public void queryKey(Get get, boolean verify) throws IOException { + String rowKey = new String(get.getRow()); + + // read the data + long start = System.currentTimeMillis(); + Result result = table.get(get); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + numKeys.addAndGet(1); + + // if we got no data report error + if (result.isEmpty()) { + HRegionLocation hloc = table.getRegionLocation( + Bytes.toBytes(rowKey)); + LOG.info("Key = " + rowKey + ", RegionServer: " + + hloc.getHostname()); + numReadErrors.addAndGet(1); + LOG.error("No data returned, tried to get actions for key = " + + rowKey + (writer == null ? "" : ", keys inserted by writer: " + + writer.numKeys.get() + ")")); + + if (numReadErrors.get() > maxErrors) { + LOG.error("Aborting readers -- found more than " + maxErrors + + " errors\n"); + aborted = true; + } + } + + if (result.getFamilyMap(columnFamily) != null) { + // increment number of columns read + numCols.addAndGet(result.getFamilyMap(columnFamily).size()); + + if (verify) { + // verify the result + List keyValues = result.list(); + for (KeyValue kv : keyValues) { + String qual = new String(kv.getQualifier()); + + // if something does not look right report it + if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) { + numReadErrors.addAndGet(1); + LOG.error("Error checking data for key = " + rowKey + + ", actionId = " + qual); + } + } + numKeysVerified.addAndGet(1); + } + } + } + + } + + public long getNumReadFailures() { + return numReadFailures.get(); + } + + public long getNumReadErrors() { + return numReadErrors.get(); + } + + public long getNumKeysVerified() { + return numKeysVerified.get(); + } + + public long getNumUniqueKeysVerified() { + return numUniqueKeysVerified.get(); + } + + @Override + protected String progressInfo() { + StringBuilder sb = new StringBuilder(); + appendToStatus(sb, "verified", numKeysVerified.get()); + appendToStatus(sb, "READ FAILURES", numReadFailures.get()); + appendToStatus(sb, "READ ERRORS", numReadErrors.get()); + return sb.toString(); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java b/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java new file mode 100644 index 00000000000..c2447b02d8d --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; + +/** Creates multiple threads that write key/values into the */ +public class MultiThreadedWriter extends MultiThreadedAction { + private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); + + private long minColumnsPerKey = 1; + private long maxColumnsPerKey = 10; + private Set writers = new HashSet(); + + private boolean isMultiPut = false; + + /** + * A temporary place to keep track of inserted keys. This is written to by + * all writers and is drained on a separate thread that populates + * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys + * being inserted. This queue is supposed to stay small. + */ + private BlockingQueue insertedKeys = + new ArrayBlockingQueue(10000); + + /** + * This is the current key to be inserted by any thread. Each thread does an + * atomic get and increment operation and inserts the current value. + */ + private AtomicLong nextKeyToInsert = new AtomicLong(); + + /** + * The highest key in the contiguous range of keys . + */ + private AtomicLong insertedUpToKey = new AtomicLong(); + + /** The sorted set of keys NOT inserted by the writers */ + private Set failedKeySet = new ConcurrentSkipListSet(); + + /** + * The total size of the temporary inserted key set that have not yet lined + * up in a our contiguous sequence starting from startKey. Supposed to stay + * small. + */ + private AtomicLong insertedKeyQueueSize = new AtomicLong(); + + /** Enable this if used in conjunction with a concurrent reader. */ + private boolean trackInsertedKeys; + + public MultiThreadedWriter(Configuration conf, byte[] tableName, + byte[] columnFamily) { + super(conf, tableName, columnFamily, "W"); + } + + /** Use multi-puts vs. separate puts for every column in a row */ + public void setMultiPut(boolean isMultiPut) { + this.isMultiPut = isMultiPut; + } + + public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) { + this.minColumnsPerKey = minColumnsPerKey; + this.maxColumnsPerKey = maxColumnsPerKey; + } + + @Override + public void start(long startKey, long endKey, int numThreads) + throws IOException { + super.start(startKey, endKey, numThreads); + + if (verbose) { + LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); + } + + nextKeyToInsert.set(startKey); + insertedUpToKey.set(startKey - 1); + + for (int i = 0; i < numThreads; ++i) { + HBaseWriterThread writer = new HBaseWriterThread(i); + writers.add(writer); + } + + if (trackInsertedKeys) { + new Thread(new InsertedKeysTracker()).start(); + numThreadsWorking.incrementAndGet(); + } + + startThreads(writers); + } + + public static byte[] longToByteArrayKey(long rowKey) { + return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); + } + + private class HBaseWriterThread extends Thread { + private final HTable table; + private final int writerId; + + private final Random random = new Random(); + private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator( + minDataSize, maxDataSize); + + public HBaseWriterThread(int writerId) throws IOException { + setName(getClass().getSimpleName() + "_" + writerId); + table = new HTable(conf, tableName); + this.writerId = writerId; + } + + public void run() { + try { + long rowKey; + while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) { + long numColumns = minColumnsPerKey + Math.abs(random.nextLong()) + % (maxColumnsPerKey - minColumnsPerKey); + numKeys.addAndGet(1); + if (isMultiPut) { + multiPutInsertKey(rowKey, 0, numColumns); + } else { + for (long col = 0; col < numColumns; ++col) { + insert(rowKey, col); + } + } + if (trackInsertedKeys) { + insertedKeys.add(rowKey); + } + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + numThreadsWorking.decrementAndGet(); + } + } + + public void insert(long rowKey, long col) { + Put put = new Put(longToByteArrayKey(rowKey)); + String colAsStr = String.valueOf(col); + put.add(columnFamily, colAsStr.getBytes(), + dataGenerator.generateRandomSizeValue(rowKey, colAsStr)); + try { + long start = System.currentTimeMillis(); + table.put(put); + numCols.addAndGet(1); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(rowKey); + LOG.error("Failed to insert: " + rowKey); + e.printStackTrace(); + } + } + + public void multiPutInsertKey(long rowKey, long startCol, long endCol) { + if (verbose) { + LOG.debug("Preparing put for key = " + rowKey + ", cols = [" + + startCol + ", " + endCol + ")"); + } + + if (startCol >= endCol) { + return; + } + + Put put = new Put(LoadTestKVGenerator.md5PrefixedKey( + rowKey).getBytes()); + byte[] columnQualifier; + byte[] value; + for (long i = startCol; i < endCol; ++i) { + String qualStr = String.valueOf(i); + columnQualifier = qualStr.getBytes(); + value = dataGenerator.generateRandomSizeValue(rowKey, qualStr); + put.add(columnFamily, columnQualifier, value); + } + + try { + long start = System.currentTimeMillis(); + table.put(put); + numCols.addAndGet(endCol - startCol); + totalOpTimeMs.addAndGet( + System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(rowKey); + e.printStackTrace(); + } + } + } + + /** + * A thread that keeps track of the highest key in the contiguous range of + * inserted keys. + */ + private class InsertedKeysTracker implements Runnable { + + @Override + public void run() { + Thread.currentThread().setName(getClass().getSimpleName()); + try { + long expectedKey = startKey; + Queue sortedKeys = new PriorityQueue(); + while (expectedKey < endKey) { + // Block until a new element is available. + Long k; + try { + k = insertedKeys.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.info("Inserted key tracker thread interrupted", e); + break; + } + if (k == null) { + continue; + } + if (k == expectedKey) { + // Skip the "sorted key" queue and consume this key. + insertedUpToKey.set(k); + ++expectedKey; + } else { + sortedKeys.add(k); + } + + // See if we have a sequence of contiguous keys lined up. + while (!sortedKeys.isEmpty() + && ((k = sortedKeys.peek()) == expectedKey)) { + sortedKeys.poll(); + insertedUpToKey.set(k); + ++expectedKey; + } + + insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size()); + } + } catch (Exception ex) { + LOG.error("Error in inserted key tracker", ex); + } finally { + numThreadsWorking.decrementAndGet(); + } + } + + } + + @Override + public void waitForFinish() { + super.waitForFinish(); + System.out.println("Failed to write keys: " + failedKeySet.size()); + for (Long key : failedKeySet) { + System.out.println("Failed to write key: " + key); + } + } + + public int getNumWriteFailures() { + return failedKeySet.size(); + } + + /** + * The max key until which all keys have been inserted (successfully or not). + * @return the last key that we have inserted all keys up to (inclusive) + */ + public long insertedUpToKey() { + return insertedUpToKey.get(); + } + + public boolean failedToWriteKey(long k) { + return failedKeySet.contains(k); + } + + @Override + protected String progressInfo() { + StringBuilder sb = new StringBuilder(); + appendToStatus(sb, "insertedUpTo", insertedUpToKey.get()); + appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get()); + return sb.toString(); + } + + /** + * Used for a joint write/read workload. Enables tracking the last inserted + * key, which requires a blocking queue and a consumer thread. + * @param enable whether to enable tracking the last inserted key + */ + void setTrackInsertedKeys(boolean enable) { + trackInsertedKeys = enable; + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/ProcessBasedLocalHBaseCluster.java b/src/test/java/org/apache/hadoop/hbase/util/ProcessBasedLocalHBaseCluster.java new file mode 100644 index 00000000000..05c97f0c445 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/ProcessBasedLocalHBaseCluster.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.TreeMap; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; + +/** + * A helper class for process-based mini-cluster tests. Unlike + * {@link MiniHBaseCluster}, starts daemons as separate processes, allowing to + * do real kill testing. + */ +public class ProcessBasedLocalHBaseCluster { + + private static final String DEFAULT_WORKDIR = + "/tmp/hbase-" + System.getenv("USER"); + + private final String hbaseHome; + private final String workDir; + + private int numRegionServers; + private final int zkClientPort; + private final int masterPort; + + private final Configuration conf; + + private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000; + + private static final Log LOG = LogFactory.getLog( + ProcessBasedLocalHBaseCluster.class); + + private List daemonPidFiles = + Collections.synchronizedList(new ArrayList());; + + private boolean shutdownHookInstalled; + + private String hbaseDaemonScript; + + /** + * Constructor. Modifies the passed configuration. + * @param hbaseHome the top directory of the HBase source tree + */ + public ProcessBasedLocalHBaseCluster(Configuration conf, String hbaseHome, + int numRegionServers) { + this.conf = conf; + this.hbaseHome = hbaseHome; + this.numRegionServers = numRegionServers; + this.workDir = DEFAULT_WORKDIR; + + hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh"; + zkClientPort = HBaseTestingUtility.randomFreePort(); + masterPort = HBaseTestingUtility.randomFreePort(); + + conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); + } + + public void start() throws IOException { + cleanupOldState(); + + // start ZK + LOG.info("Starting ZooKeeper"); + startZK(); + + HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort); + + startMaster(); + ZKUtil.waitForBaseZNode(conf); + + for (int idx = 0; idx < numRegionServers; idx++) { + startRegionServer(HBaseTestingUtility.randomFreePort()); + } + + LOG.info("Waiting for HBase startup by scanning META"); + int attemptsLeft = 10; + while (attemptsLeft-- > 0) { + try { + new HTable(conf, HConstants.META_TABLE_NAME); + } catch (Exception e) { + LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft, + e); + Threads.sleep(1000); + } + } + + LOG.info("Process-based HBase Cluster with " + numRegionServers + + " region servers up and running... \n\n"); + } + + public void startRegionServer(int port) { + startServer("regionserver", port); + } + + public void startMaster() { + startServer("master", 0); + } + + public void killRegionServer(int port) throws IOException { + killServer("regionserver", port); + } + + public void killMaster() throws IOException { + killServer("master", 0); + } + + public void startZK() { + startServer("zookeeper", 0); + } + + private void executeCommand(String command) { + ensureShutdownHookInstalled(); + executeCommand(command, null); + } + + private void executeCommand(String command, Map envOverrides) { + LOG.debug("Command : " + command); + + try { + String [] envp = null; + if (envOverrides != null) { + Map map = new HashMap( + System.getenv()); + map.putAll(envOverrides); + envp = new String[map.size()]; + int idx = 0; + for (Map.Entry e: map.entrySet()) { + envp[idx++] = e.getKey() + "=" + e.getValue(); + } + } + + Process p = Runtime.getRuntime().exec(command, envp); + + BufferedReader stdInput = new BufferedReader( + new InputStreamReader(p.getInputStream())); + BufferedReader stdError = new BufferedReader( + new InputStreamReader(p.getErrorStream())); + + // read the output from the command + String s = null; + while ((s = stdInput.readLine()) != null) { + System.out.println(s); + } + + // read any errors from the attempted command + while ((s = stdError.readLine()) != null) { + System.out.println(s); + } + } catch (IOException e) { + LOG.error("Error running: " + command, e); + } + } + + private void shutdownAllProcesses() { + LOG.info("Killing daemons using pid files"); + final List pidFiles = new ArrayList(daemonPidFiles); + for (String pidFile : pidFiles) { + int pid = 0; + try { + pid = readPidFromFile(pidFile); + } catch (IOException ex) { + LOG.error("Could not kill process with pid from " + pidFile); + } + + if (pid > 0) { + LOG.info("Killing pid " + pid + " (" + pidFile + ")"); + killProcess(pid); + } + } + + LOG.info("Waiting a bit to let processes terminate"); + Threads.sleep(5000); + } + + private void ensureShutdownHookInstalled() { + if (shutdownHookInstalled) { + return; + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + shutdownAllProcesses(); + } + })); + + shutdownHookInstalled = true; + } + + private void cleanupOldState() { + executeCommand("rm -rf " + workDir); + } + + private void writeStringToFile(String s, String fileName) { + try { + BufferedWriter out = new BufferedWriter(new FileWriter(fileName)); + out.write(s); + out.close(); + } catch (IOException e) { + LOG.error("Error writing to: " + fileName, e); + } + } + + private String serverWorkingDir(String serverName, int port) { + String dir; + if (serverName.equals("regionserver")) { + dir = workDir + "/" + serverName + "-" + port; + } else { + dir = workDir + "/" + serverName; + } + return dir; + } + + private int getServerPID(String serverName, int port) throws IOException { + String pidFile = pidFilePath(serverName, port); + return readPidFromFile(pidFile); + } + + private static int readPidFromFile(String pidFile) throws IOException { + Scanner scanner = new Scanner(new File(pidFile)); + try { + return scanner.nextInt(); + } finally { + scanner.close(); + } + } + + private String pidFilePath(String serverName, int port) { + String dir = serverWorkingDir(serverName, port); + String user = System.getenv("USER"); + String pidFile = String.format("%s/hbase-%s-%s.pid", + dir, user, serverName); + return pidFile; + } + + private void killServer(String serverName, int port) throws IOException { + int pid = getServerPID(serverName, port); + if (pid > 0) { + LOG.info("Killing " + serverName + "; pid=" + pid); + killProcess(pid); + } + } + + private void killProcess(int pid) { + String cmd = "kill -s KILL " + pid; + executeCommand(cmd); + } + + private void startServer(String serverName, int rsPort) { + String conf = generateConfig(rsPort); + + // create working directory for this region server. + String dir = serverWorkingDir(serverName, rsPort); + executeCommand("mkdir -p " + dir); + + writeStringToFile(conf, dir + "/" + "hbase-site.xml"); + + Map envOverrides = new HashMap(); + envOverrides.put("HBASE_LOG_DIR", dir); + envOverrides.put("HBASE_PID_DIR", dir); + try { + FileUtils.copyFile( + new File(hbaseHome, "conf/log4j.properties"), + new File(dir, "log4j.properties")); + } catch (IOException ex) { + LOG.error("Could not install log4j.properties into " + dir); + } + + executeCommand(hbaseDaemonScript + " --config " + dir + + " start " + serverName, envOverrides); + daemonPidFiles.add(pidFilePath(serverName, rsPort)); + } + + private final String generateConfig(int rsPort) { + StringBuilder sb = new StringBuilder(); + Map confMap = new TreeMap(); + confMap.put(HConstants.CLUSTER_DISTRIBUTED, true); + if (rsPort > 0) { + confMap.put(HConstants.REGIONSERVER_PORT, rsPort); + confMap.put(HConstants.REGIONSERVER_INFO_PORT_AUTO, true); + } + + confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); + confMap.put(HConstants.MASTER_PORT, masterPort); + confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE); + confMap.put("fs.file.impl", RawLocalFileSystem.class.getName()); + + sb.append("\n"); + for (Map.Entry entry : confMap.entrySet()) { + sb.append(" \n"); + sb.append(" " + entry.getKey() + "\n"); + sb.append(" " + entry.getValue() + "\n"); + sb.append(" \n"); + } + sb.append("\n"); + return sb.toString(); + } + + public Configuration getConf() { + return conf; + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java b/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java new file mode 100644 index 00000000000..825846f1327 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; + +/** + * A command-line tool that spins up a local process-based cluster, loads + * some data, restarts the regionserver holding .META., and verifies that the + * cluster recovers. + */ +public class RestartMetaTest extends AbstractHBaseTool { + + private static final Log LOG = LogFactory.getLog(RestartMetaTest.class); + + /** The number of region servers used if not specified */ + private static final int DEFAULT_NUM_RS = 2; + + /** Table name for the test */ + private static byte[] TABLE_NAME = Bytes.toBytes("load_test"); + + /** The number of seconds to sleep after loading the data */ + private static final int SLEEP_SEC_AFTER_DATA_LOAD = 5; + + /** The actual number of region servers */ + private int numRegionServers; + + /** HBase home source tree home directory */ + private String hbaseHome; + + private static final String OPT_HBASE_HOME = "hbase_home"; + private static final String OPT_NUM_RS = "num_rs"; + + /** Loads data into the table using the multi-threaded writer. */ + private void loadData() throws IOException { + long startKey = 0; + long endKey = 100000; + long minColsPerKey = 5; + long maxColsPerKey = 15; + int minColDataSize = 256; + int maxColDataSize = 256 * 3; + int numThreads = 10; + + // print out the arguments + System.out.printf("Key range %d .. %d\n", startKey, endKey); + System.out.printf("Number of Columns/Key: %d..%d\n", minColsPerKey, + maxColsPerKey); + System.out.printf("Data Size/Column: %d..%d bytes\n", minColDataSize, + maxColDataSize); + System.out.printf("Client Threads: %d\n", numThreads); + + // start the writers + MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME, + LoadTestTool.COLUMN_FAMILY); + writer.setMultiPut(true); + writer.setColumnsPerKey(minColsPerKey, maxColsPerKey); + writer.setDataSize(minColDataSize, maxColDataSize); + writer.start(startKey, endKey, numThreads); + System.out.printf("Started loading data..."); + writer.waitForFinish(); + System.out.printf("Finished loading data..."); + } + + @Override + protected void doWork() throws IOException { + ProcessBasedLocalHBaseCluster hbaseCluster = + new ProcessBasedLocalHBaseCluster(conf, hbaseHome, numRegionServers); + + // start the process based HBase cluster + hbaseCluster.start(); + + // create tables if needed + HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME, + LoadTestTool.COLUMN_FAMILY); + + LOG.debug("Loading data....\n\n"); + loadData(); + + LOG.debug("Sleeping for " + SLEEP_SEC_AFTER_DATA_LOAD + + " seconds....\n\n"); + Threads.sleep(5 * SLEEP_SEC_AFTER_DATA_LOAD); + + int metaRSPort = HBaseTestingUtility.getMetaRSPort(conf); + + LOG.debug("Killing META region server running on port " + metaRSPort); + hbaseCluster.killRegionServer(metaRSPort); + Threads.sleep(2000); + + LOG.debug("Restarting region server running on port metaRSPort"); + hbaseCluster.startRegionServer(metaRSPort); + Threads.sleep(2000); + + LOG.debug("Trying to scan meta"); + + HTable metaTable = new HTable(conf, HConstants.META_TABLE_NAME); + ResultScanner scanner = metaTable.getScanner(new Scan()); + Result result; + while ((result = scanner.next()) != null) { + LOG.info("Region assignment from META: " + + Bytes.toStringBinary(result.getRow()) + + " => " + + Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY) + .get(HConstants.SERVER_QUALIFIER))); + } + } + + @Override + protected void addOptions() { + addRequiredOptWithArg(OPT_HBASE_HOME, "HBase home directory"); + addOptWithArg(OPT_NUM_RS, "Number of Region Servers"); + } + + @Override + protected void processOptions(CommandLine cmd) { + hbaseHome = cmd.getOptionValue(OPT_HBASE_HOME); + if (hbaseHome == null || !new File(hbaseHome).isDirectory()) { + throw new IllegalArgumentException("Invalid HBase home directory: " + + hbaseHome); + } + + LOG.info("Using HBase home directory " + hbaseHome); + numRegionServers = Integer.parseInt(cmd.getOptionValue(OPT_NUM_RS, + String.valueOf(DEFAULT_NUM_RS))); + } + + public static void main(String[] args) { + new RestartMetaTest().doStaticMain(args); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java b/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java index e153fea9ecf..9625c72c564 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java @@ -25,6 +25,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Arrays; import junit.framework.TestCase; @@ -163,7 +164,7 @@ public class TestBytes extends TestCase { assertEquals(decimals[i], Bytes.toBigDecimal(b2, 1, b.length)); } } - + private byte [] bytesWithOffset(byte [] src) { // add one byte in front to test offset byte [] result = new byte[src.length + 1]; @@ -171,7 +172,7 @@ public class TestBytes extends TestCase { System.arraycopy(src, 0, result, 1, src.length); return result; } - + public void testBinarySearch() throws Exception { byte [][] arr = { {1}, diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java b/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java new file mode 100644 index 00000000000..55948b840ad --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestLoadTestKVGenerator { + + private static final int MIN_LEN = 10; + private static final int MAX_LEN = 20; + + private Random rand = new Random(28937293L); + private LoadTestKVGenerator gen = new LoadTestKVGenerator(MIN_LEN, MAX_LEN); + + @Test + public void testValueLength() { + for (int i = 0; i < 1000; ++i) { + byte[] v = gen.generateRandomSizeValue(i, + String.valueOf(rand.nextInt())); + assertTrue(MIN_LEN <= v.length); + assertTrue(v.length <= MAX_LEN); + } + } + + @Test + public void testVerification() { + for (int i = 0; i < 1000; ++i) { + for (int qualIndex = 0; qualIndex < 20; ++qualIndex) { + String qual = String.valueOf(qualIndex); + byte[] v = gen.generateRandomSizeValue(i, qual); + String rowKey = LoadTestKVGenerator.md5PrefixedKey(i); + assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v)); + v[0]++; + assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v)); + } + } + } + + @Test + public void testCorrectAndUniqueKeys() { + Set keys = new HashSet(); + for (int i = 0; i < 1000; ++i) { + String k = LoadTestKVGenerator.md5PrefixedKey(i); + assertFalse(keys.contains(k)); + assertTrue(k.endsWith(":" + i)); + keys.add(k); + } + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java b/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java index b7874ef71a5..d33f4576ed0 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java @@ -62,7 +62,7 @@ public class TestMergeTable { * Hand-makes regions of a mergeable size and adds the hand-made regions to * hand-made meta. The hand-made regions are created offline. We then start * up mini cluster, disables the hand-made table and starts in on merging. - * @throws Exception + * @throws Exception */ @Test (timeout=300000) public void testMergeTable() throws Exception { // Table we are manually creating offline. @@ -70,7 +70,7 @@ public class TestMergeTable { desc.addFamily(new HColumnDescriptor(COLUMN_NAME)); // Set maximum regionsize down. - UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 64L * 1024L * 1024L); + UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 64L * 1024L * 1024L); // Make it so we don't split. UTIL.getConfiguration().setInt("hbase.regionserver.regionSplitLimit", 0); // Startup hdfs. Its in here we'll be putting our manually made regions. diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java b/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java new file mode 100644 index 00000000000..ef7eeefd4ac --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.LargeTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * A write/read/verify load test on a mini HBase cluster. Tests reading + * and writing at the same time. + */ +@Category(LargeTests.class) +@RunWith(Parameterized.class) +public class TestMiniClusterLoadParallel + extends TestMiniClusterLoadSequential { + + public TestMiniClusterLoadParallel(boolean isMultiPut) { + super(isMultiPut); + } + + @Test(timeout=120000) + public void loadTest() throws Exception { + prepareForLoadTest(); + + readerThreads.linkToWriter(writerThreads); + + writerThreads.start(0, NUM_KEYS, NUM_THREADS); + readerThreads.start(0, NUM_KEYS, NUM_THREADS); + + writerThreads.waitForFinish(); + readerThreads.waitForFinish(); + + assertEquals(0, writerThreads.getNumWriteFailures()); + assertEquals(0, readerThreads.getNumReadFailures()); + assertEquals(0, readerThreads.getNumReadErrors()); + assertEquals(NUM_KEYS, readerThreads.getNumUniqueKeysVerified()); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java b/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java new file mode 100644 index 00000000000..de7f473499d --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * A write/read/verify load test on a mini HBase cluster. Tests reading + * and then writing. + */ +@Category(LargeTests.class) +@RunWith(Parameterized.class) +public class TestMiniClusterLoadSequential { + + private static final Log LOG = LogFactory.getLog( + TestMiniClusterLoadSequential.class); + + protected static final byte[] TABLE = Bytes.toBytes("load_test_tbl"); + protected static final byte[] CF = Bytes.toBytes("load_test_cf"); + protected static final long NUM_KEYS = 10000; + protected static final int NUM_THREADS = 8; + protected static final int NUM_RS = 2; + protected static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + protected final Configuration conf = TEST_UTIL.getConfiguration(); + protected final boolean isMultiPut; + + protected MultiThreadedWriter writerThreads; + protected MultiThreadedReader readerThreads; + + public TestMiniClusterLoadSequential(boolean isMultiPut) { + this.isMultiPut = isMultiPut; + } + + @Parameters + public static Collection parameters() { + return HBaseTestingUtility.BOOLEAN_PARAMETERIZED; + } + + @Before + public void setUp() throws Exception { + LOG.debug("Test setup: isMultiPut=" + isMultiPut); + TEST_UTIL.startMiniCluster(1, NUM_RS); + } + + @After + public void tearDown() throws Exception { + LOG.debug("Test teardown: isMultiPut=" + isMultiPut); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout=120000) + public void loadTest() throws Exception { + prepareForLoadTest(); + + writerThreads.start(0, NUM_KEYS, NUM_THREADS); + writerThreads.waitForFinish(); + assertEquals(0, writerThreads.getNumWriteFailures()); + + readerThreads.start(0, NUM_KEYS, NUM_THREADS); + readerThreads.waitForFinish(); + assertEquals(0, readerThreads.getNumReadFailures()); + assertEquals(0, readerThreads.getNumReadErrors()); + assertEquals(NUM_KEYS, readerThreads.getNumKeysVerified()); + } + + protected void prepareForLoadTest() throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + while (admin.getClusterStatus().getServers().size() < NUM_RS) { + LOG.info("Sleeping until " + NUM_RS + " RSs are online"); + Threads.sleepWithoutInterrupt(1000); + } + admin.close(); + + int numRegions = + HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE, CF); + TEST_UTIL.waitUntilAllRegionsAssigned(numRegions); + + writerThreads = new MultiThreadedWriter(conf, TABLE, CF); + writerThreads.setMultiPut(isMultiPut); + readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100); + } + +}