[jira] [HBASE-5684] Make ProcessBasedLocalHBaseCluster run HDFS and make it more

robust

Summary:
Currently ProcessBasedLocalHBaseCluster runs on top of raw local filesystem. We
need it to start a process-based HDFS cluster as well. We also need to make the
whole thing more stable so we can use it in unit tests.

Also all logs of local HBase cluster daemons are now tailed to the primary log
of the unit test with the appropriate prefixes to make debugging easier.

This is a trunk diff. The 89-fb version is D2709.

Test Plan:
Run the new unit test multiple times (10x or 50x).
Run all unit tests.

Reviewers: tedyu, stack, lhofhansl, nspiegelberg, amirshim, JIRA

Reviewed By: tedyu

Differential Revision: https://reviews.facebook.net/D2757

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1326036 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbautin 2012-04-14 01:25:27 +00:00
parent b4a1bb164b
commit a2c1c0cd3e
15 changed files with 643 additions and 114 deletions

View File

@ -0,0 +1,90 @@
#!/bin/bash
set -e -u -o pipefail
SCRIPT_NAME=${0##*/}
SCRIPT_DIR=$(cd `dirname $0` && pwd )
print_usage() {
cat >&2 <<EOT
Usage: $SCRIPT_NAME <options>
Options:
--kill
Kill local process-based HBase cluster using pid files.
--show
Show HBase processes running on this machine
EOT
exit 1
}
show_processes() {
ps -ef | grep -P "(HRegionServer|HMaster|HQuorumPeer) start" | grep -v grep
}
cmd_specified() {
if [ "$CMD_SPECIFIED" ]; then
echo "Only one command can be specified" >&2
exit 1
fi
CMD_SPECIFIED=1
}
list_pid_files() {
LOCAL_CLUSTER_DIR=$SCRIPT_DIR/../../target/local_cluster
LOCAL_CLUSTER_DIR=$( cd $LOCAL_CLUSTER_DIR && pwd )
find $LOCAL_CLUSTER_DIR -name "*.pid"
}
if [ $# -eq 0 ]; then
print_usage
fi
IS_KILL=""
IS_SHOW=""
CMD_SPECIFIED=""
while [ $# -ne 0 ]; do
case "$1" in
-h|--help)
print_usage ;;
--kill)
IS_KILL=1
cmd_specified ;;
--show)
IS_SHOW=1
cmd_specified ;;
*)
echo "Invalid option: $1" >&2
exit 1
esac
shift
done
if [ "$IS_KILL" ]; then
list_pid_files | \
while read F; do
PID=`cat $F`
echo "Killing pid $PID from file $F"
# Kill may fail but that's OK, so turn off error handling for a moment.
set +e
kill -9 $PID
set -e
done
elif [ "$IS_SHOW" ]; then
PIDS=""
for F in `list_pid_files`; do
PID=`cat $F`
if [ -n "$PID" ]; then
if [ -n "$PIDS" ]; then
PIDS="$PIDS,"
fi
PIDS="$PIDS$PID"
fi
done
ps -p $PIDS
else
echo "No command specified" >&2
exit 1
fi

View File

@ -82,7 +82,7 @@ public final class HConstants {
/** Cluster is fully-distributed */
public static final boolean CLUSTER_IS_DISTRIBUTED = true;
/** Default value for cluster distributed mode */
/** Default value for cluster distributed mode */
public static final boolean DEFAULT_CLUSTER_DISTRIBUTED = CLUSTER_IS_LOCAL;
/** default host address */
@ -97,6 +97,9 @@ public final class HConstants {
/** default port for master web api */
public static final int DEFAULT_MASTER_INFOPORT = 60010;
/** Configuration key for master web API port */
public static final String MASTER_INFO_PORT = "hbase.master.info.port";
/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
@ -173,9 +176,13 @@ public final class HConstants {
/** default port for region server web api */
public static final int DEFAULT_REGIONSERVER_INFOPORT = 60030;
/** A configuration key for regionserver info port */
public static final String REGIONSERVER_INFO_PORT =
"hbase.regionserver.info.port";
/** A flag that enables automatic selection of regionserver info port */
public static final String REGIONSERVER_INFO_PORT_AUTO =
"hbase.regionserver.info.port.auto";
REGIONSERVER_INFO_PORT + ".auto";
/** Parameter name for what region server interface to use. */
public static final String REGION_SERVER_CLASS = "hbase.regionserver.class";
@ -194,13 +201,13 @@ public final class HConstants {
/** Default value for thread wake frequency */
public static final int DEFAULT_THREAD_WAKE_FREQUENCY = 10 * 1000;
/** Parameter name for how often we should try to write a version file, before failing */
public static final String VERSION_FILE_WRITE_ATTEMPTS = "hbase.server.versionfile.writeattempts";
/** Parameter name for how often we should try to write a version file, before failing */
public static final int DEFAULT_VERSION_FILE_WRITE_ATTEMPTS = 3;
/** Parameter name for how often a region should should perform a major compaction */
public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction";
@ -308,11 +315,11 @@ public final class HConstants {
/** The RegionInfo qualifier as a string */
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
/** The regioninfo column qualifier */
public static final byte [] REGIONINFO_QUALIFIER =
public static final byte [] REGIONINFO_QUALIFIER =
Bytes.toBytes(REGIONINFO_QUALIFIER_STR);
/** The server column qualifier */
public static final byte [] SERVER_QUALIFIER = Bytes.toBytes("server");
@ -618,10 +625,10 @@ public final class HConstants {
/** Host name of the local machine */
public static final String LOCALHOST = "localhost";
/**
/**
* If this parameter is set to true, then hbase will read
* data and then verify checksums. Checksum verification
* inside hdfs will be switched off. However, if the hbase-checksum
* data and then verify checksums. Checksum verification
* inside hdfs will be switched off. However, if the hbase-checksum
* verification fails, then it will switch back to using
* hdfs checksums for verifiying data that is being read from storage.
*
@ -629,9 +636,15 @@ public final class HConstants {
* verify any checksums, instead it will depend on checksum verification
* being done in the hdfs client.
*/
public static final String HBASE_CHECKSUM_VERIFICATION =
public static final String HBASE_CHECKSUM_VERIFICATION =
"hbase.regionserver.checksum.verify";
public static final String LOCALHOST_IP = "127.0.0.1";
/** Conf key that enables distributed log splitting */
public static final String DISTRIBUTED_LOG_SPLITTING_KEY =
"hbase.master.distributed.log.splitting";
/**
* The name of the configuration parameter that specifies
* the number of bytes in a newly created checksum chunk.

View File

@ -881,7 +881,7 @@ Server {
Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
int port = this.conf.getInt(HConstants.MASTER_INFO_PORT, 60010);
if (port >= 0) {
String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);

View File

@ -101,7 +101,7 @@ public class MasterFileSystem {
conf.set("fs.default.name", fsUri);
conf.set("fs.defaultFS", fsUri);
this.distributedLogSplitting =
conf.getBoolean("hbase.master.distributed.log.splitting", true);
conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
if (this.distributedLogSplitting) {
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
master.getConfiguration(), master, master.getServerName().toString());

View File

@ -1397,7 +1397,7 @@ public class HRegionServer extends RegionServer
* @throws IOException
*/
private int putUpWebUI() throws IOException {
int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
// -1 is for disabling info server
if (port < 0) return port;
String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");

View File

@ -35,10 +35,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
@ -159,6 +161,9 @@ public class HBaseTestingUtility {
/** Filesystem URI used for map-reduce mini-cluster setup */
private static String FS_URI;
/** A set of ports that have been claimed using {@link #randomFreePort()}. */
private static final Set<Integer> takenRandomPorts = new HashSet<Integer>();
/** Compression algorithms to use in parameterized JUnit 4 tests */
public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
Arrays.asList(new Object[][] {
@ -1923,7 +1928,10 @@ public class HBaseTestingUtility {
Bytes.toBytes(String.format(keyFormat, splitStartKey)),
Bytes.toBytes(String.format(keyFormat, splitEndKey)),
numRegions);
hbaseCluster.flushcache(HConstants.META_TABLE_NAME);
if (hbaseCluster != null) {
hbaseCluster.flushcache(HConstants.META_TABLE_NAME);
}
for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
@ -1958,7 +1966,9 @@ public class HBaseTestingUtility {
}
LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
table.flushCommits();
hbaseCluster.flushcache(tableNameBytes);
if (hbaseCluster != null) {
hbaseCluster.flushcache(tableNameBytes);
}
}
return table;
@ -1976,10 +1986,19 @@ public class HBaseTestingUtility {
+ new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
}
/**
* Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
* called from single-threaded test setup code/
*/
public static int randomFreePort() {
int port = 0;
do {
port = randomPort();
if (takenRandomPorts.contains(port)) {
continue;
}
takenRandomPorts.add(port);
try {
ServerSocket sock = new ServerSocket(port);
sock.close();

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.Collections;
import org.apache.hadoop.hbase.util.Bytes;
/** Similar to {@link HConstants} but for tests. */
public class HTestConst {
private HTestConst() {
}
public static final String DEFAULT_TABLE_STR = "MyTestTable";
public static final byte[] DEFAULT_TABLE_BYTES = Bytes.toBytes(DEFAULT_TABLE_STR);
public static final String DEFAULT_CF_STR = "MyDefaultCF";
public static final byte[] DEFAULT_CF_BYTES = Bytes.toBytes(DEFAULT_CF_STR);
public static final Set<String> DEFAULT_CF_STR_SET =
Collections.unmodifiableSet(new HashSet<String>(
Arrays.asList(new String[] { DEFAULT_CF_STR })));
}

View File

@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.URL;
@ -31,8 +33,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertTrue;
/**
* Testing, info servers are disabled. This test enables then and checks that
* they serve pages.
@ -46,8 +46,8 @@ public class TestInfoServers {
public static void beforeClass() throws Exception {
// The info servers do not run in tests by default.
// Set them to ephemeral ports so they will start
UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
UTIL.getConfiguration().setInt("hbase.regionserver.info.port", 0);
UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_INFO_PORT, 0);
UTIL.startMiniCluster();
}
@ -113,4 +113,3 @@ public class TestInfoServers {
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -29,12 +29,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -54,8 +54,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -89,6 +89,9 @@ public class TestDistributedLogSplitting {
conf.getLong("hbase.splitlog.max.resubmit", 0);
// Make the failure test faster
conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
cluster = TEST_UTIL.getHBaseCluster();
@ -159,7 +162,7 @@ public class TestDistributedLogSplitting {
FileSystem fs = master.getMasterFileSystem().getFileSystem();
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
Path rootdir = FSUtils.getRootDir(conf);
installTable(new ZooKeeperWatcher(conf, "table-creation", null),
@ -174,7 +177,7 @@ public class TestDistributedLogSplitting {
}
final Path logDir = new Path(rootdir, HLog.getHLogDirectoryName(hrs
.getServerName().toString()));
LOG.info("#regions = " + regions.size());
Iterator<HRegionInfo> it = regions.iterator();
while (it.hasNext()) {
@ -250,19 +253,19 @@ public class TestDistributedLogSplitting {
long waitTime = 80000;
long endt = curt + waitTime;
while (curt < endt) {
if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
tot_wkr_final_transistion_failed.get() + tot_wkr_task_done.get() +
tot_wkr_preempt_task.get()) == 0) {
Thread.yield();
curt = System.currentTimeMillis();
} else {
assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
tot_wkr_final_transistion_failed.get() + tot_wkr_task_done.get() +
tot_wkr_preempt_task.get()));
return;
}
}
fail("none of the following counters went up in " + waitTime +
fail("none of the following counters went up in " + waitTime +
" milliseconds - " +
"tot_wkr_task_resigned, tot_wkr_task_err, " +
"tot_wkr_final_transistion_failed, tot_wkr_task_done, " +
@ -483,4 +486,3 @@ public class TestDistributedLogSplitting {
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -44,7 +45,6 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.junit.experimental.categories.Category;
@ -86,7 +86,7 @@ public class TestScanner extends HBaseTestCase {
private HRegion r;
private HRegionIncommon region;
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
final private byte[] col1, col2;
@ -270,7 +270,7 @@ public class TestScanner extends HBaseTestCase {
// Store some new information
String address = "www.example.com:1234";
String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
put = new Put(ROW_KEY, System.currentTimeMillis(), null);
put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
@ -607,4 +607,3 @@ public class TestScanner extends HBaseTestCase {
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.net.URL;
import org.apache.hadoop.hbase.HConstants;
/** Determines HBase home path from either class or jar directory */
public class HBaseHomePath {
private static final String TARGET_CLASSES = "/target/classes";
private static final String JAR_SUFFIX = ".jar!";
private static final String FILE_PREFIX = "file:";
private HBaseHomePath() {
}
public static String getHomePath() {
String className = HConstants.class.getName(); // This could have been any HBase class.
String relPathForClass = className.replace(".", "/") + ".class";
URL url = ClassLoader.getSystemResource(relPathForClass);
relPathForClass = "/" + relPathForClass;
if (url == null) {
throw new RuntimeException("Could not lookup class location for " + className);
}
String path = url.getPath();
if (!path.endsWith(relPathForClass)) {
throw new RuntimeException("Got invalid path trying to look up class " + className +
": " + path);
}
path = path.substring(0, path.length() - relPathForClass.length());
if (path.startsWith(FILE_PREFIX)) {
path = path.substring(FILE_PREFIX.length());
}
if (path.endsWith(TARGET_CLASSES)) {
path = path.substring(0, path.length() - TARGET_CLASSES.length());
} else if (path.endsWith(JAR_SUFFIX)) {
int slashIndex = path.lastIndexOf("/");
if (slashIndex != -1) {
throw new RuntimeException("Expected to find slash in jar path " + path);
}
path = path.substring(0, slashIndex);
} else {
throw new RuntimeException("Cannot identify HBase source directory or installation path " +
"from " + path);
}
return path;
}
}

View File

@ -19,46 +19,52 @@ package org.apache.hadoop.hbase.util;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.experimental.categories.Category;
/**
* A helper class for process-based mini-cluster tests. Unlike
* {@link MiniHBaseCluster}, starts daemons as separate processes, allowing to
* do real kill testing.
*/
@Category(LargeTests.class)
public class ProcessBasedLocalHBaseCluster {
private static final String DEFAULT_WORKDIR =
"/tmp/hbase-" + System.getenv("USER");
private final String hbaseHome;
private final String workDir;
private int numRegionServers;
private final int zkClientPort;
private final int masterPort;
private final String hbaseHome, workDir;
private final Configuration conf;
private final int numMasters, numRegionServers, numDataNodes;
private final List<Integer> rsPorts, masterPorts;
private final int zkClientPort;
private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
@ -72,39 +78,93 @@ public class ProcessBasedLocalHBaseCluster {
private String hbaseDaemonScript;
private MiniDFSCluster dfsCluster;
private HBaseTestingUtility testUtil;
private Thread logTailerThread;
private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
private static enum ServerType {
MASTER("master"),
RS("regionserver"),
ZK("zookeeper");
private final String fullName;
private ServerType(String fullName) {
this.fullName = fullName;
}
}
/**
* Constructor. Modifies the passed configuration.
* @param hbaseHome the top directory of the HBase source tree
*/
public ProcessBasedLocalHBaseCluster(Configuration conf, String hbaseHome,
int numRegionServers) {
public ProcessBasedLocalHBaseCluster(Configuration conf,
int numDataNodes, int numRegionServers) {
this.conf = conf;
this.hbaseHome = hbaseHome;
this.hbaseHome = HBaseHomePath.getHomePath();
this.numMasters = 1;
this.numRegionServers = numRegionServers;
this.workDir = DEFAULT_WORKDIR;
this.workDir = hbaseHome + "/target/local_cluster";
this.numDataNodes = numDataNodes;
hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
zkClientPort = HBaseTestingUtility.randomFreePort();
masterPort = HBaseTestingUtility.randomFreePort();
this.rsPorts = sortedPorts(numRegionServers);
this.masterPorts = sortedPorts(numMasters);
conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
}
public void start() throws IOException {
/**
* Makes this local HBase cluster use a mini-DFS cluster. Must be called before
* {@link #startHBase()}.
* @throws IOException
*/
public void startMiniDFS() throws Exception {
if (testUtil == null) {
testUtil = new HBaseTestingUtility(conf);
}
dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
}
/**
* Generates a list of random port numbers in the sorted order. A sorted
* order makes sense if we ever want to refer to these servers by their index
* in the returned array, e.g. server #0, #1, etc.
*/
private static List<Integer> sortedPorts(int n) {
List<Integer> ports = new ArrayList<Integer>(n);
for (int i = 0; i < n; ++i) {
ports.add(HBaseTestingUtility.randomFreePort());
}
Collections.sort(ports);
return ports;
}
public void startHBase() throws IOException {
startDaemonLogTailer();
cleanupOldState();
// start ZK
LOG.info("Starting ZooKeeper");
LOG.info("Starting ZooKeeper on port " + zkClientPort);
startZK();
HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
startMaster();
for (int masterPort : masterPorts) {
startMaster(masterPort);
}
ZKUtil.waitForBaseZNode(conf);
for (int idx = 0; idx < numRegionServers; idx++) {
startRegionServer(HBaseTestingUtility.randomFreePort());
for (int rsPort : rsPorts) {
startRegionServer(rsPort);
}
LOG.info("Waiting for HBase startup by scanning META");
@ -124,32 +184,32 @@ public class ProcessBasedLocalHBaseCluster {
}
public void startRegionServer(int port) {
startServer("regionserver", port);
startServer(ServerType.RS, port);
}
public void startMaster() {
startServer("master", 0);
public void startMaster(int port) {
startServer(ServerType.MASTER, port);
}
public void killRegionServer(int port) throws IOException {
killServer("regionserver", port);
killServer(ServerType.RS, port);
}
public void killMaster() throws IOException {
killServer("master", 0);
killServer(ServerType.MASTER, 0);
}
public void startZK() {
startServer("zookeeper", 0);
startServer(ServerType.ZK, 0);
}
private void executeCommand(String command) {
ensureShutdownHookInstalled();
executeCommand(command, null);
}
private void executeCommand(String command, Map<String,
String> envOverrides) {
ensureShutdownHookInstalled();
LOG.debug("Command : " + command);
try {
@ -195,7 +255,7 @@ public class ProcessBasedLocalHBaseCluster {
try {
pid = readPidFromFile(pidFile);
} catch (IOException ex) {
LOG.error("Could not kill process with pid from " + pidFile);
LOG.error("Could not read pid from file " + pidFile);
}
if (pid > 0) {
@ -203,9 +263,6 @@ public class ProcessBasedLocalHBaseCluster {
killProcess(pid);
}
}
LOG.info("Waiting a bit to let processes terminate");
Threads.sleep(5000);
}
private void ensureShutdownHookInstalled() {
@ -237,18 +294,12 @@ public class ProcessBasedLocalHBaseCluster {
}
}
private String serverWorkingDir(String serverName, int port) {
String dir;
if (serverName.equals("regionserver")) {
dir = workDir + "/" + serverName + "-" + port;
} else {
dir = workDir + "/" + serverName;
}
return dir;
private String serverWorkingDir(ServerType serverType, int port) {
return workDir + "/" + serverType + "-" + port;
}
private int getServerPID(String serverName, int port) throws IOException {
String pidFile = pidFilePath(serverName, port);
private int getServerPID(ServerType serverType, int port) throws IOException {
String pidFile = pidFilePath(serverType, port);
return readPidFromFile(pidFile);
}
@ -261,18 +312,18 @@ public class ProcessBasedLocalHBaseCluster {
}
}
private String pidFilePath(String serverName, int port) {
String dir = serverWorkingDir(serverName, port);
private String pidFilePath(ServerType serverType, int port) {
String dir = serverWorkingDir(serverType, port);
String user = System.getenv("USER");
String pidFile = String.format("%s/hbase-%s-%s.pid",
dir, user, serverName);
dir, user, serverType.fullName);
return pidFile;
}
private void killServer(String serverName, int port) throws IOException {
int pid = getServerPID(serverName, port);
private void killServer(ServerType serverType, int port) throws IOException {
int pid = getServerPID(serverType, port);
if (pid > 0) {
LOG.info("Killing " + serverName + "; pid=" + pid);
LOG.info("Killing " + serverType + "; pid=" + pid);
killProcess(pid);
}
}
@ -282,14 +333,30 @@ public class ProcessBasedLocalHBaseCluster {
executeCommand(cmd);
}
private void startServer(String serverName, int rsPort) {
String conf = generateConfig(rsPort);
private void startServer(ServerType serverType, int rsPort) {
// create working directory for this region server.
String dir = serverWorkingDir(serverName, rsPort);
executeCommand("mkdir -p " + dir);
String dir = serverWorkingDir(serverType, rsPort);
String confStr = generateConfig(serverType, rsPort, dir);
LOG.debug("Creating directory " + dir);
new File(dir).mkdirs();
writeStringToFile(conf, dir + "/" + "hbase-site.xml");
writeStringToFile(confStr, dir + "/hbase-site.xml");
// Set debug options to an empty string so that hbase-config.sh does not configure them
// using default ports. If we want to run remote debugging on process-based local cluster's
// daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon
// and specify them here.
writeStringToFile(
"unset HBASE_MASTER_OPTS\n" +
"unset HBASE_REGIONSERVER_OPTS\n" +
"unset HBASE_ZOOKEEPER_OPTS\n" +
"HBASE_MASTER_DBG_OPTS=' '\n" +
"HBASE_REGIONSERVER_DBG_OPTS=' '\n" +
"HBASE_ZOOKEEPER_DBG_OPTS=' '\n" +
"HBASE_MASTER_JMX_OPTS=' '\n" +
"HBASE_REGIONSERVER_JMX_OPTS=' '\n" +
"HBASE_ZOOKEEPER_JMX_OPTS=' '\n",
dir + "/hbase-env.sh");
Map<String, String> envOverrides = new HashMap<String, String>();
envOverrides.put("HBASE_LOG_DIR", dir);
@ -303,23 +370,42 @@ public class ProcessBasedLocalHBaseCluster {
}
executeCommand(hbaseDaemonScript + " --config " + dir +
" start " + serverName, envOverrides);
daemonPidFiles.add(pidFilePath(serverName, rsPort));
" start " + serverType.fullName, envOverrides);
daemonPidFiles.add(pidFilePath(serverType, rsPort));
logTailDirs.add(dir);
}
private final String generateConfig(int rsPort) {
private final String generateConfig(ServerType serverType, int rpcPort,
String daemonDir) {
StringBuilder sb = new StringBuilder();
Map<String, Object> confMap = new TreeMap<String, Object>();
confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
if (rsPort > 0) {
confMap.put(HConstants.REGIONSERVER_PORT, rsPort);
confMap.put(HConstants.REGIONSERVER_INFO_PORT_AUTO, true);
if (serverType == ServerType.MASTER) {
confMap.put(HConstants.MASTER_PORT, rpcPort);
int masterInfoPort = HBaseTestingUtility.randomFreePort();
reportWebUIPort("master", masterInfoPort);
confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
} else if (serverType == ServerType.RS) {
confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
int rsInfoPort = HBaseTestingUtility.randomFreePort();
reportWebUIPort("region server", rsInfoPort);
confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
} else {
confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
}
confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
confMap.put(HConstants.MASTER_PORT, masterPort);
confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
confMap.put("fs.file.impl", RawLocalFileSystem.class.getName());
if (dfsCluster != null) {
String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
confMap.put("fs.default.name", fsURL);
confMap.put("fs.defaultFS", fsURL);
confMap.put("hbase.rootdir", fsURL + "/hbase_test");
}
sb.append("<configuration>\n");
for (Map.Entry<String, Object> entry : confMap.entrySet()) {
@ -332,8 +418,140 @@ public class ProcessBasedLocalHBaseCluster {
return sb.toString();
}
private static void reportWebUIPort(String daemon, int port) {
LOG.info("Local " + daemon + " web UI is at http://"
+ HConstants.LOCALHOST + ":" + port);
}
public Configuration getConf() {
return conf;
}
public void shutdown() {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
shutdownAllProcesses();
}
private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
private static final Pattern LOG_PATH_FORMAT_RE =
Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
private static String processLine(String line) {
Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
return m.replaceAll("");
}
private final class LocalDaemonLogTailer implements Runnable {
private final Set<String> tailedFiles = new HashSet<String>();
private final List<String> dirList = new ArrayList<String>();
private final Object printLock = new Object();
private final FilenameFilter LOG_FILES = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith(".out") || name.endsWith(".log");
}
};
@Override
public void run() {
try {
runInternal();
} catch (IOException ex) {
LOG.error(ex);
}
}
private void runInternal() throws IOException {
Thread.currentThread().setName(getClass().getSimpleName());
while (true) {
scanDirs();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.error("Log tailer thread interrupted", e);
break;
}
}
}
private void scanDirs() throws FileNotFoundException {
dirList.clear();
dirList.addAll(logTailDirs);
for (String d : dirList) {
for (File f : new File(d).listFiles(LOG_FILES)) {
String filePath = f.getAbsolutePath();
if (!tailedFiles.contains(filePath)) {
tailedFiles.add(filePath);
startTailingFile(filePath);
}
}
}
}
private void startTailingFile(final String filePath) throws FileNotFoundException {
final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
final ServerType serverType;
final int serverPort;
Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
if (m.matches()) {
serverType = ServerType.valueOf(m.group(1));
serverPort = Integer.valueOf(m.group(2));
} else {
LOG.error("Unrecognized log path format: " + filePath);
return;
}
final String logMsgPrefix =
"[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
LOG.debug("Tailing " + filePath);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
FileInputStream fis = new FileInputStream(filePath);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line;
while (true) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
LOG.error("Tailer for " + filePath + " interrupted");
break;
}
while ((line = br.readLine()) != null) {
line = logMsgPrefix + processLine(line);
synchronized (printLock) {
if (line.endsWith("\n")) {
dest.print(line);
} else {
dest.println(line);
}
dest.flush();
}
}
}
} catch (IOException ex) {
LOG.error("Failed tailing " + filePath, ex);
}
}
});
t.setDaemon(true);
t.setName("Tailer for " + filePath);
t.start();
}
}
private void startDaemonLogTailer() {
logTailerThread = new Thread(new LocalDaemonLogTailer());
logTailerThread.setDaemon(true);
logTailerThread.start();
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.File;
import java.io.IOException;
import org.apache.commons.cli.CommandLine;
@ -52,12 +51,10 @@ public class RestartMetaTest extends AbstractHBaseTool {
/** The actual number of region servers */
private int numRegionServers;
/** HBase home source tree home directory */
private String hbaseHome;
private static final String OPT_HBASE_HOME = "hbase_home";
private static final String OPT_NUM_RS = "num_rs";
private static final int NUM_DATANODES = 3;
/** Loads data into the table using the multi-threaded writer. */
private void loadData() throws IOException {
long startKey = 0;
@ -89,12 +86,13 @@ public class RestartMetaTest extends AbstractHBaseTool {
}
@Override
protected void doWork() throws IOException {
protected void doWork() throws Exception {
ProcessBasedLocalHBaseCluster hbaseCluster =
new ProcessBasedLocalHBaseCluster(conf, hbaseHome, numRegionServers);
new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers);
hbaseCluster.startMiniDFS();
// start the process based HBase cluster
hbaseCluster.start();
hbaseCluster.startHBase();
// create tables if needed
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
@ -134,7 +132,6 @@ public class RestartMetaTest extends AbstractHBaseTool {
@Override
protected void addOptions() {
addRequiredOptWithArg(OPT_HBASE_HOME, "HBase home directory");
addOptWithArg(OPT_NUM_RS, "Number of Region Servers");
addOptWithArg(LoadTestTool.OPT_DATA_BLOCK_ENCODING,
LoadTestTool.OPT_DATA_BLOCK_ENCODING_USAGE);
@ -142,13 +139,6 @@ public class RestartMetaTest extends AbstractHBaseTool {
@Override
protected void processOptions(CommandLine cmd) {
hbaseHome = cmd.getOptionValue(OPT_HBASE_HOME);
if (hbaseHome == null || !new File(hbaseHome).isDirectory()) {
throw new IllegalArgumentException("Invalid HBase home directory: " +
hbaseHome);
}
LOG.info("Using HBase home directory " + hbaseHome);
numRegionServers = Integer.parseInt(cmd.getOptionValue(OPT_NUM_RS,
String.valueOf(DEFAULT_NUM_RS)));
}

View File

@ -93,7 +93,7 @@ public class TestHBaseFsck {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.master.distributed.log.splitting", false);
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, false);
TEST_UTIL.startMiniCluster(3);
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.junit.Test;
/**
* A basic unit test that spins up a local HBase cluster.
*/
public class TestProcessBasedCluster {
private static final Log LOG = LogFactory.getLog(TestProcessBasedCluster.class);
private static final int COLS_PER_ROW = 5;
private static final int FLUSHES = 5;
private static final int NUM_REGIONS = 5;
private static final int ROWS_PER_FLUSH = 5;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Test(timeout=300 * 1000)
public void testProcessBasedCluster() throws Exception {
ProcessBasedLocalHBaseCluster cluster = new ProcessBasedLocalHBaseCluster(
TEST_UTIL.getConfiguration(), 2, 3);
cluster.startMiniDFS();
cluster.startHBase();
try {
TEST_UTIL.createRandomTable(HTestConst.DEFAULT_TABLE_STR,
HTestConst.DEFAULT_CF_STR_SET,
HColumnDescriptor.DEFAULT_VERSIONS, COLS_PER_ROW, FLUSHES, NUM_REGIONS,
ROWS_PER_FLUSH);
HTable table = new HTable(TEST_UTIL.getConfiguration(), HTestConst.DEFAULT_TABLE_BYTES);
ResultScanner scanner = table.getScanner(HTestConst.DEFAULT_CF_BYTES);
Result result;
int rows = 0;
int cols = 0;
while ((result = scanner.next()) != null) {
++rows;
cols += result.getFamilyMap(HTestConst.DEFAULT_CF_BYTES).size();
}
LOG.info("Read " + rows + " rows, " + cols + " columns");
scanner.close();
table.close();
// These numbers are deterministic, seeded by table name.
assertEquals(19, rows);
assertEquals(35, cols);
} catch (Exception ex) {
LOG.error(ex);
throw ex;
} finally {
cluster.shutdown();
}
}
@Test
public void testHomePath() {
File pom = new File(HBaseHomePath.getHomePath(), "pom.xml");
assertTrue(pom.getPath() + " does not exist", pom.exists());
}
}