HADOOP-2084 Add a LocalHBaseCluster
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@588264 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa1c77bd3e
commit
1aeb23aeee
@ -6,6 +6,7 @@ Trunk (unreleased changes)
|
||||
|
||||
NEW FEATURES
|
||||
HADOOP-2061 Add new Base64 dialects
|
||||
HADOOP-2084 Add a LocalHBaseCluster
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
@ -24,9 +24,10 @@
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hbase.master</name>
|
||||
<value>0.0.0.0:60000</value>
|
||||
<value>local</value>
|
||||
<description>The host and port that the HBase master runs at.
|
||||
TODO: Support 'local' (All running in single context).
|
||||
A value of 'local' runs the master and a regionserver in
|
||||
a single process.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -31,4 +31,12 @@ public class HBaseConfiguration extends Configuration {
|
||||
addResource("hbase-default.xml");
|
||||
addResource("hbase-site.xml");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a clone of passed configuration.
|
||||
* @param c Configuration to clone.
|
||||
*/
|
||||
public HBaseConfiguration(final Configuration c) {
|
||||
super(c);
|
||||
}
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ public class HConnectionManager implements HConstants {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public TableServers(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.conf = LocalHBaseCluster.doLocal(new HBaseConfiguration(conf));
|
||||
|
||||
String serverClassName =
|
||||
conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
|
||||
|
@ -44,8 +44,11 @@ public interface HConstants {
|
||||
/** default host address */
|
||||
static final String DEFAULT_HOST = "0.0.0.0";
|
||||
|
||||
static final int DEFAULT_MASTER_PORT = 60000;
|
||||
|
||||
/** Default master address */
|
||||
static final String DEFAULT_MASTER_ADDRESS = DEFAULT_HOST + ":60000";
|
||||
static final String DEFAULT_MASTER_ADDRESS = DEFAULT_HOST + ":" +
|
||||
DEFAULT_MASTER_PORT;
|
||||
|
||||
/** default port for master web api */
|
||||
static final int DEFAULT_MASTER_INFOPORT = 60010;
|
||||
|
@ -35,8 +35,8 @@ import java.util.TimerTask;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -1294,16 +1294,12 @@ HMasterRegionInterface {
|
||||
/** {@inheritDoc} */
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
|
||||
throws IOException {
|
||||
|
||||
String serverName = serverInfo.getServerAddress().toString().trim();
|
||||
long serverLabel = getServerLabel(serverName);
|
||||
|
||||
if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
|
||||
|
||||
// HRegionServer is shutting down. Cancel the server's lease.
|
||||
// Note that cancelling the server's lease takes care of updating
|
||||
// Note that canceling the server's lease takes care of updating
|
||||
// serversToServerInfo, etc.
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Region server " + serverName +
|
||||
": MSG_REPORT_EXITING -- cancelling lease");
|
||||
@ -1312,7 +1308,6 @@ HMasterRegionInterface {
|
||||
if (cancelLease(serverName, serverLabel)) {
|
||||
// Only process the exit message if the server still has a lease.
|
||||
// Otherwise we could end up processing the server exit twice.
|
||||
|
||||
LOG.info("Region server " + serverName +
|
||||
": MSG_REPORT_EXITING -- lease cancelled");
|
||||
// Get all the regions the server was serving reassigned
|
||||
@ -1341,7 +1336,6 @@ HMasterRegionInterface {
|
||||
// Tell server to shut down if we are shutting down. This should
|
||||
// happen after check of MSG_REPORT_EXITING above, since region server
|
||||
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP
|
||||
|
||||
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
|
||||
}
|
||||
|
||||
@ -2396,7 +2390,6 @@ HMasterRegionInterface {
|
||||
return !closed.get();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void shutdown() {
|
||||
TimerTask tt = new TimerTask() {
|
||||
@Override
|
||||
@ -2408,7 +2401,7 @@ HMasterRegionInterface {
|
||||
}
|
||||
}
|
||||
};
|
||||
Timer t = new Timer("Shutdown");
|
||||
Timer t = new Timer(getName() + "-Shutdown");
|
||||
t.schedule(tt, 10);
|
||||
}
|
||||
|
||||
@ -3028,7 +3021,8 @@ HMasterRegionInterface {
|
||||
}
|
||||
|
||||
protected static void doMain(String [] args,
|
||||
Class<? extends HMaster> masterClass) {
|
||||
Class<? extends HMaster> masterClass)
|
||||
throws IOException {
|
||||
if (args.length < 1) {
|
||||
printUsageAndExit();
|
||||
}
|
||||
@ -3047,10 +3041,15 @@ HMasterRegionInterface {
|
||||
|
||||
if (cmd.equals("start")) {
|
||||
try {
|
||||
Constructor<? extends HMaster> c =
|
||||
masterClass.getConstructor(Configuration.class);
|
||||
HMaster master = c.newInstance(conf);
|
||||
master.start();
|
||||
// If 'local', defer to LocalHBaseCluster instance.
|
||||
if (LocalHBaseCluster.isLocal(conf)) {
|
||||
(new LocalHBaseCluster(conf)).startup();
|
||||
} else {
|
||||
Constructor<? extends HMaster> c =
|
||||
masterClass.getConstructor(Configuration.class);
|
||||
HMaster master = c.newInstance(conf);
|
||||
master.start();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error( "Can not start master", t);
|
||||
System.exit(-1);
|
||||
@ -3060,6 +3059,9 @@ HMasterRegionInterface {
|
||||
|
||||
if (cmd.equals("stop")) {
|
||||
try {
|
||||
if (LocalHBaseCluster.isLocal(conf)) {
|
||||
LocalHBaseCluster.doLocal(conf);
|
||||
}
|
||||
HBaseAdmin adm = new HBaseAdmin(conf);
|
||||
adm.shutdown();
|
||||
} catch (Throwable t) {
|
||||
@ -3077,8 +3079,9 @@ HMasterRegionInterface {
|
||||
/**
|
||||
* Main program
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String [] args) {
|
||||
public static void main(String [] args) throws IOException {
|
||||
doMain(args, HMaster.class);
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,6 @@ class HMerge implements HConstants {
|
||||
*/
|
||||
public static void merge(Configuration conf, FileSystem fs, Text tableName)
|
||||
throws IOException {
|
||||
|
||||
HConnection connection = HConnectionManager.getConnection(conf);
|
||||
boolean masterIsRunning = connection.isMasterRunning();
|
||||
if(tableName.equals(META_TABLE_NAME)) {
|
||||
|
@ -103,7 +103,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
private final int serverLeaseTimeout;
|
||||
|
||||
// Remote HMaster
|
||||
private final HMasterRegionInterface hbaseMaster;
|
||||
private HMasterRegionInterface hbaseMaster;
|
||||
|
||||
// Server to handle client requests. Default access so can be accessed by
|
||||
// unit tests.
|
||||
@ -422,11 +422,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
this.leases = new Leases(
|
||||
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
||||
this.threadWakeFrequency);
|
||||
// Remote HMaster
|
||||
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
|
||||
HMasterRegionInterface.class, HMasterRegionInterface.versionID,
|
||||
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
||||
conf);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -435,9 +430,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
* load/unload instructions.
|
||||
*/
|
||||
public void run() {
|
||||
// Set below if HMaster asked us stop.
|
||||
boolean masterRequestedStop = false;
|
||||
|
||||
try {
|
||||
init(reportForDuty());
|
||||
long lastMsg = 0;
|
||||
@ -509,7 +501,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got regionserver stop message");
|
||||
}
|
||||
masterRequestedStop = true;
|
||||
stopRequested.set(true);
|
||||
break;
|
||||
|
||||
@ -600,20 +591,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
RemoteExceptionHandler.checkIOException(e));
|
||||
}
|
||||
try {
|
||||
if (!masterRequestedStop && closedRegions != null) {
|
||||
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
|
||||
exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING);
|
||||
// Tell the master what regions we are/were serving
|
||||
int i = 1;
|
||||
for(HRegion region: closedRegions) {
|
||||
exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE,
|
||||
region.getRegionInfo());
|
||||
}
|
||||
|
||||
LOG.info("telling master that region server is shutting down at: " +
|
||||
serverInfo.getServerAddress().toString());
|
||||
hbaseMaster.regionServerReport(serverInfo, exitMsg);
|
||||
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
|
||||
exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING);
|
||||
// Tell the master what regions we are/were serving
|
||||
int i = 1;
|
||||
for(HRegion region: closedRegions) {
|
||||
exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE,
|
||||
region.getRegionInfo());
|
||||
}
|
||||
|
||||
LOG.info("telling master that region server is shutting down at: " +
|
||||
serverInfo.getServerAddress().toString());
|
||||
hbaseMaster.regionServerReport(serverInfo, exitMsg);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to send exiting message to master: ",
|
||||
RemoteExceptionHandler.checkIOException(e));
|
||||
@ -783,10 +772,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
* Let the master know we're here
|
||||
* Run initialization using parameters passed us by the master.
|
||||
*/
|
||||
private MapWritable reportForDuty() {
|
||||
private MapWritable reportForDuty() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Telling master we are up");
|
||||
}
|
||||
// Do initial RPC setup.
|
||||
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
|
||||
HMasterRegionInterface.class, HMasterRegionInterface.versionID,
|
||||
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
||||
this.conf);
|
||||
MapWritable result = null;
|
||||
long lastMsg = 0;
|
||||
while(!stopRequested.get()) {
|
||||
@ -1452,13 +1446,19 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
|
||||
if (cmd.equals("start")) {
|
||||
try {
|
||||
|
||||
Constructor<? extends HRegionServer> c =
|
||||
regionServerClass.getConstructor(Configuration.class);
|
||||
HRegionServer hrs = c.newInstance(conf);
|
||||
Thread t = new Thread(hrs);
|
||||
t.setName("regionserver" + hrs.server.getListenerAddress());
|
||||
t.start();
|
||||
// If 'local', don't start a region server here. Defer to
|
||||
// LocalHBaseCluster. It manages 'local' clusters.
|
||||
if (LocalHBaseCluster.isLocal(conf)) {
|
||||
LOG.warn("Not starting a distinct region server because " +
|
||||
"hbase.master is set to 'local' mode");
|
||||
} else {
|
||||
Constructor<? extends HRegionServer> c =
|
||||
regionServerClass.getConstructor(Configuration.class);
|
||||
HRegionServer hrs = c.newInstance(conf);
|
||||
Thread t = new Thread(hrs);
|
||||
t.setName("regionserver" + hrs.server.getListenerAddress());
|
||||
t.start();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error( "Can not start region server because "+
|
||||
StringUtils.stringifyException(t) );
|
||||
|
273
src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
Normal file
273
src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
Normal file
@ -0,0 +1,273 @@
|
||||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* This class creates a single process HBase cluster. One thread is created for
|
||||
* a master and one per region server.
|
||||
*
|
||||
* Call {@link #startup()} to start the cluster running and {@link #shutdown()}
|
||||
* to close it all down. {@link #join} the cluster is you want to wait on
|
||||
* shutdown completion.
|
||||
*
|
||||
* <p>Runs master on port 60000 by default. Because we can't just kill the
|
||||
* process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
|
||||
* be able to find the master with a remote client to run shutdown. To use a
|
||||
* port other than 60000, set the hbase.master to a value of 'local:PORT':
|
||||
* that is 'local', not 'localhost', and the port number the master should use
|
||||
* instead of 60000.
|
||||
*
|
||||
* <p>To make 'local' mode more responsive, make values such as
|
||||
* <code>hbase.regionserver.msginterval</code>,
|
||||
* <code>hbase.master.meta.thread.rescanfrequency</code>, and
|
||||
* <code>hbase.server.thread.wakefrequency</code> a second or less.
|
||||
*/
|
||||
public class LocalHBaseCluster implements HConstants {
|
||||
static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
|
||||
private final HMaster master;
|
||||
private final List<RegionServerThread> regionThreads;
|
||||
private final static int DEFAULT_NO = 1;
|
||||
public static final String LOCAL = "local";
|
||||
public static final String LOCAL_COLON = LOCAL + ":";
|
||||
private final Configuration conf;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public LocalHBaseCluster(final Configuration conf)
|
||||
throws IOException {
|
||||
this(conf, DEFAULT_NO);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param conf Configuration to use. Post construction has the master's
|
||||
* address.
|
||||
* @param noRegionServers Count of regionservers to start.
|
||||
* @throws IOException
|
||||
*/
|
||||
public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
|
||||
throws IOException {
|
||||
super();
|
||||
this.conf = conf;
|
||||
doLocal(conf);
|
||||
// Create the master
|
||||
this.master = new HMaster(conf);
|
||||
// Set the master's port for the HRegionServers
|
||||
conf.set(MASTER_ADDRESS, this.master.getMasterAddress().toString());
|
||||
// Start the HRegionServers. Always have region servers come up on
|
||||
// port '0' so there won't be clashes over default port as unit tests
|
||||
// start/stop ports at different times during the life of the test.
|
||||
conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
|
||||
this.regionThreads = new ArrayList<RegionServerThread>();
|
||||
for (int i = 0; i < noRegionServers; i++) {
|
||||
addRegionServer();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a region server.
|
||||
* Call 'start' on the returned thread to make it run.
|
||||
*
|
||||
* @throws IOException
|
||||
* @return Region server added.
|
||||
*/
|
||||
public RegionServerThread addRegionServer() throws IOException {
|
||||
RegionServerThread t = new RegionServerThread(new HRegionServer(conf),
|
||||
this.regionThreads.size());
|
||||
this.regionThreads.add(t);
|
||||
return t;
|
||||
}
|
||||
|
||||
/** runs region servers */
|
||||
public static class RegionServerThread extends Thread {
|
||||
private final HRegionServer regionServer;
|
||||
|
||||
RegionServerThread(final HRegionServer r, final int index) {
|
||||
super(r, "RegionServer:" + index);
|
||||
this.regionServer = r;
|
||||
}
|
||||
|
||||
/** @return the region server */
|
||||
public HRegionServer getRegionServer() {
|
||||
return this.regionServer;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the HMaster thread
|
||||
*/
|
||||
public HMaster getMaster() {
|
||||
return this.master;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only list of region server threads.
|
||||
*/
|
||||
public List<RegionServerThread> getRegionServers() {
|
||||
return Collections.unmodifiableList(this.regionThreads);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the specified region server to stop
|
||||
* Removes this thread from list of running threads.
|
||||
* @param serverNumber
|
||||
* @return Name of region server that just went down.
|
||||
*/
|
||||
public String waitOnRegionServer(int serverNumber) {
|
||||
RegionServerThread regionServerThread =
|
||||
this.regionThreads.remove(serverNumber);
|
||||
try {
|
||||
LOG.info("Waiting on " +
|
||||
regionServerThread.getRegionServer().serverInfo.toString());
|
||||
regionServerThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return regionServerThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for Mini HBase Cluster to shut down.
|
||||
* Presumes you've already called {@link #shutdown()}.
|
||||
*/
|
||||
public void join() {
|
||||
if (this.regionThreads != null) {
|
||||
synchronized(this.regionThreads) {
|
||||
for(Thread t: this.regionThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (this.master != null && this.master.isAlive()) {
|
||||
try {
|
||||
this.master.join();
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the cluster.
|
||||
* @return Address to use contacting master.
|
||||
*/
|
||||
public String startup() {
|
||||
this.master.start();
|
||||
for (RegionServerThread t: this.regionThreads) {
|
||||
t.start();
|
||||
}
|
||||
return this.master.getMasterAddress().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the mini HBase cluster
|
||||
*/
|
||||
public void shutdown() {
|
||||
LOG.debug("Shutting down HBase Cluster");
|
||||
if(this.master != null) {
|
||||
this.master.shutdown();
|
||||
}
|
||||
// regionServerThreads can never be null because they are initialized when
|
||||
// the class is constructed.
|
||||
synchronized(this.regionThreads) {
|
||||
for(Thread t: this.regionThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (this.master != null) {
|
||||
try {
|
||||
this.master.join();
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
LOG.info("Shutdown " +
|
||||
((this.regionThreads != null)? this.master.getName(): "0 masters") +
|
||||
" " + this.regionThreads.size() + " region server(s)");
|
||||
}
|
||||
|
||||
/**
|
||||
* Changes <code>hbase.master</code> from 'local' to 'localhost:PORT' in
|
||||
* passed Configuration instance.
|
||||
* @param c
|
||||
* @return The passed <code>c</code> configuration modified if hbase.master
|
||||
* value was 'local' otherwise, unaltered.
|
||||
*/
|
||||
static Configuration doLocal(final Configuration c) {
|
||||
if (!isLocal(c)) {
|
||||
return c;
|
||||
}
|
||||
// Need to rewrite address in Configuration if not done already.
|
||||
String address = c.get(MASTER_ADDRESS);
|
||||
String port = address.startsWith(LOCAL_COLON)?
|
||||
address.substring(LOCAL_COLON.length()):
|
||||
Integer.toString(DEFAULT_MASTER_PORT);
|
||||
c.set(MASTER_ADDRESS, "localhost:" + port);
|
||||
return c;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c Configuration to check.
|
||||
* @return True if a 'local' address in hbase.master value.
|
||||
*/
|
||||
public static boolean isLocal(final Configuration c) {
|
||||
String address = c.get(MASTER_ADDRESS);
|
||||
return address == null || address.equals(LOCAL) ||
|
||||
address.startsWith(LOCAL_COLON);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test things basically work.
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
Configuration conf = new HBaseConfiguration();
|
||||
LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
|
||||
cluster.startup();
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
admin.createTable(new HTableDescriptor(cluster.getClass().getName()));
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
@ -40,8 +40,8 @@ files: <code>${HBASE_HOME}/conf/hbase-site.xml</code> and
|
||||
</p>
|
||||
<p>
|
||||
<code>hbase-site.xml</code> allows the user to override the properties defined in
|
||||
<code>${HBASE_HOME}/conf/hbase-default.xml</code>. <code>hbase-default.xml</code> itself
|
||||
should never be modified. At a minimum the <code>hbase.master</code> property should be redefined
|
||||
<code>${HBASE_HOME}/conf/hbase-default.xml</code> (<code>hbase-default.xml</code> itself
|
||||
should never be modified). At a minimum the <code>hbase.master</code> property should be redefined
|
||||
in <code>hbase-site.xml</code> to define the <code>host:port</code> pair on which to run the
|
||||
HMaster (<a href="http://wiki.apache.org/lucene-hadoop/Hbase/HbaseArchitecture">read about the
|
||||
HBase master, regionservers, etc</a>):
|
||||
@ -71,12 +71,15 @@ can be set in <code>${HBASE_HOME}/conf/hbase-env.sh</code>.</li>
|
||||
</ul>
|
||||
|
||||
<h2><a name="runandconfirm">Running and Confirming Your Installation</a></h2>
|
||||
<p>
|
||||
If you are running a distributed operation you will need to start the Hadoop daemons
|
||||
<p>If you are running in standalone, non-distributed mode, hbase by default uses
|
||||
the local filesystem.</p>
|
||||
<p>If you are running a distributed cluster you will need to start the Hadoop DFS daemons
|
||||
before starting HBase and stop the daemons after HBase has shut down. Start and
|
||||
stop the Hadoop daemons as per the Hadoop
|
||||
<a href="http://lucene.apache.org/hadoop/api/overview-summary.html">instructions</a>. Afterwards,
|
||||
or if running a standalone operation, start HBase with the following command:
|
||||
stop the Hadoop DFS daemons as per the Hadoop
|
||||
<a href="http://lucene.apache.org/hadoop/api/overview-summary.html">instructions</a>. HBase
|
||||
does not normally use the mapreduce daemons. These do not need to be started.</p>
|
||||
|
||||
<p>Start HBase with the following command:
|
||||
</p>
|
||||
<pre>
|
||||
${HBASE_HOME}/bin/start-hbase.sh
|
||||
|
@ -26,12 +26,13 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Abstract base class for HBase cluster junit tests. Spins up cluster on
|
||||
* {@link #setUp()} and takes it down again in {@link #tearDown()}.
|
||||
* Abstract base class for HBase cluster junit tests. Spins up an hbase
|
||||
* cluster in setup and tears it down again in tearDown.
|
||||
*/
|
||||
public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(HBaseClusterTestCase.class.getName());
|
||||
|
||||
protected MiniHBaseCluster cluster;
|
||||
final boolean miniHdfs;
|
||||
int regionServers;
|
||||
@ -106,14 +107,12 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
||||
* regionservers and master threads are no long alive.
|
||||
*/
|
||||
public void threadDumpingJoin() {
|
||||
if (this.cluster.regionThreads != null) {
|
||||
synchronized(this.cluster.regionThreads) {
|
||||
for(Thread t: this.cluster.regionThreads) {
|
||||
threadDumpingJoin(t);
|
||||
}
|
||||
if (this.cluster.getRegionThreads() != null) {
|
||||
for(Thread t: this.cluster.getRegionThreads()) {
|
||||
threadDumpingJoin(t);
|
||||
}
|
||||
}
|
||||
threadDumpingJoin(this.cluster.getMasterThread());
|
||||
threadDumpingJoin(this.cluster.getMaster());
|
||||
}
|
||||
|
||||
public void threadDumpingJoin(final Thread t) {
|
||||
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -31,28 +30,19 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
* This class creates a single process HBase cluster for junit testing.
|
||||
* One thread is created for each server.
|
||||
*
|
||||
* <p>TestCases do not need to subclass to start a HBaseCluster. Call
|
||||
* {@link #startMaster(Configuration)} and
|
||||
* {@link #startRegionServers(Configuration, int)} to startup master and
|
||||
* region servers. Save off the returned values and pass them to
|
||||
* {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
|
||||
* to shut it all down when done.
|
||||
*
|
||||
* This class creates a single process HBase cluster. One thread is created for
|
||||
* each server.
|
||||
*/
|
||||
public class MiniHBaseCluster implements HConstants {
|
||||
static final Logger LOG =
|
||||
Logger.getLogger(MiniHBaseCluster.class.getName());
|
||||
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private FileSystem fs;
|
||||
private boolean shutdownDFS;
|
||||
private Path parentdir;
|
||||
private MasterThread masterThread = null;
|
||||
List<RegionServerThread> regionThreads =
|
||||
java.util.Collections.synchronizedList(new ArrayList<RegionServerThread>());
|
||||
private LocalHBaseCluster hbaseCluster;
|
||||
private boolean deleteOnExit = true;
|
||||
|
||||
/**
|
||||
@ -63,8 +53,7 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* @throws IOException
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int nRegionNodes)
|
||||
throws IOException {
|
||||
|
||||
throws IOException {
|
||||
this(conf, nRegionNodes, true, true, true);
|
||||
}
|
||||
|
||||
@ -143,108 +132,14 @@ public class MiniHBaseCluster implements HConstants {
|
||||
try {
|
||||
this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
|
||||
fs.mkdirs(parentdir);
|
||||
this.masterThread = startMaster(this.conf);
|
||||
this.regionThreads.addAll(startRegionServers(this.conf, nRegionNodes));
|
||||
this.hbaseCluster = new LocalHBaseCluster(this.conf, nRegionNodes);
|
||||
this.hbaseCluster.startup();
|
||||
} catch(IOException e) {
|
||||
shutdown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/** runs the master server */
|
||||
public static class MasterThread extends Thread {
|
||||
private final HMaster master;
|
||||
MasterThread(final HMaster m) {
|
||||
super(m, "Master:" + m.getMasterAddress().toString());
|
||||
this.master = m;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Starting " + getName());
|
||||
super.run();
|
||||
}
|
||||
|
||||
/** @return master server */
|
||||
public HMaster getMaster() {
|
||||
return this.master;
|
||||
}
|
||||
}
|
||||
|
||||
/** runs region servers */
|
||||
public static class RegionServerThread extends Thread {
|
||||
private final HRegionServer regionServer;
|
||||
RegionServerThread(final HRegionServer r, final int index) {
|
||||
super(r, "RegionServer:" + index);
|
||||
this.regionServer = r;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Starting " + getName());
|
||||
super.run();
|
||||
}
|
||||
|
||||
/** @return the region server */
|
||||
public HRegionServer getRegionServer() {
|
||||
return this.regionServer;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method to start a master.
|
||||
* If you want to start an hbase cluster
|
||||
* without subclassing this test case, run this method and
|
||||
* {@link #startRegionServers(Configuration, int)} to start servers.
|
||||
* Call {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
|
||||
* to shut them down.
|
||||
* @param c
|
||||
* @return Thread running the master.
|
||||
* @throws IOException
|
||||
* @see #startRegionServers(Configuration, int)
|
||||
* @see #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)
|
||||
*/
|
||||
public static MasterThread startMaster(final Configuration c)
|
||||
throws IOException {
|
||||
|
||||
if(c.get(MASTER_ADDRESS) == null) {
|
||||
c.set(MASTER_ADDRESS, "localhost:0");
|
||||
}
|
||||
// Create the master
|
||||
final HMaster m = new HMaster(c);
|
||||
MasterThread masterThread = new MasterThread(m);
|
||||
// Start up the master
|
||||
masterThread.start();
|
||||
// Set the master's port for the HRegionServers
|
||||
c.set(MASTER_ADDRESS, m.getMasterAddress().toString());
|
||||
return masterThread;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c
|
||||
* @param count
|
||||
* @return List of region server threads started. Synchronize on the
|
||||
* returned list when iterating to avoid ConcurrentModificationExceptions.
|
||||
* @throws IOException
|
||||
* @see #startMaster(Configuration)
|
||||
*/
|
||||
public static ArrayList<RegionServerThread> startRegionServers(
|
||||
final Configuration c, final int count) throws IOException {
|
||||
|
||||
// Start the HRegionServers. Always have regionservers come up on
|
||||
// port '0' so there won't be clashes over default port as unit tests
|
||||
// start/stop ports at different times during the life of the test.
|
||||
c.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
|
||||
LOG.info("Starting HRegionServers");
|
||||
ArrayList<RegionServerThread> threads =
|
||||
new ArrayList<RegionServerThread>();
|
||||
for(int i = 0; i < count; i++) {
|
||||
threads.add(startRegionServer(c, i));
|
||||
}
|
||||
return threads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a region server thread running
|
||||
*
|
||||
@ -252,21 +147,10 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* @return Name of regionserver started.
|
||||
*/
|
||||
public String startRegionServer() throws IOException {
|
||||
RegionServerThread t =
|
||||
startRegionServer(this.conf, this.regionThreads.size());
|
||||
this.regionThreads.add(t);
|
||||
return t.getName();
|
||||
}
|
||||
|
||||
private static RegionServerThread startRegionServer(final Configuration c,
|
||||
final int index)
|
||||
throws IOException {
|
||||
final HRegionServer hrs = new HRegionServer(c);
|
||||
RegionServerThread t = new RegionServerThread(hrs, index);
|
||||
t.setName("regionserver" +
|
||||
t.getRegionServer().server.getListenerAddress().toString());
|
||||
LocalHBaseCluster.RegionServerThread t =
|
||||
this.hbaseCluster.addRegionServer();
|
||||
t.start();
|
||||
return t;
|
||||
return t.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -283,24 +167,24 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* the supplied port is not necessarily the actual port used.
|
||||
*/
|
||||
public HServerAddress getHMasterAddress() {
|
||||
return this.masterThread.getMaster().getMasterAddress();
|
||||
return this.hbaseCluster.getMaster().getMasterAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the thread running the HMaster
|
||||
* @return the HMaster
|
||||
*/
|
||||
public MasterThread getMasterThread() {
|
||||
return this.masterThread;
|
||||
public HMaster getMaster() {
|
||||
return this.hbaseCluster.getMaster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cause a region server to exit without cleaning up
|
||||
*
|
||||
* @param serverNumber
|
||||
* @param serverNumber Used as index into a list.
|
||||
*/
|
||||
public void abortRegionServer(int serverNumber) {
|
||||
HRegionServer server =
|
||||
this.regionThreads.get(serverNumber).getRegionServer();
|
||||
this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
|
||||
LOG.info("Aborting " + server.serverInfo.toString());
|
||||
server.abort();
|
||||
}
|
||||
@ -308,12 +192,12 @@ public class MiniHBaseCluster implements HConstants {
|
||||
/**
|
||||
* Shut down the specified region server cleanly
|
||||
*
|
||||
* @param serverNumber
|
||||
* @param serverNumber Used as index into a list.
|
||||
* @return the region server that was stopped
|
||||
*/
|
||||
public HRegionServer stopRegionServer(int serverNumber) {
|
||||
HRegionServer server =
|
||||
this.regionThreads.get(serverNumber).getRegionServer();
|
||||
this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
|
||||
LOG.info("Stopping " + server.toString());
|
||||
server.stop();
|
||||
return server;
|
||||
@ -325,99 +209,28 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* @param serverNumber
|
||||
* @return Name of region server that just went down.
|
||||
*/
|
||||
public String waitOnRegionServer(int serverNumber) {
|
||||
RegionServerThread regionServerThread =
|
||||
this.regionThreads.remove(serverNumber);
|
||||
try {
|
||||
LOG.info("Waiting on " +
|
||||
regionServerThread.getRegionServer().serverInfo.toString());
|
||||
regionServerThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return regionServerThread.getName();
|
||||
public String waitOnRegionServer(final int serverNumber) {
|
||||
return this.hbaseCluster.waitOnRegionServer(serverNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for Mini HBase Cluster to shut down.
|
||||
*/
|
||||
public void join() {
|
||||
if (regionThreads != null) {
|
||||
synchronized(regionThreads) {
|
||||
for(Thread t: regionThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (masterThread != null && masterThread.isAlive()) {
|
||||
try {
|
||||
masterThread.join();
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down HBase cluster started by calling
|
||||
* {@link #startMaster(Configuration)} and then
|
||||
* {@link #startRegionServers(Configuration, int)};
|
||||
* @param masterThread
|
||||
* @param regionServerThreads
|
||||
*/
|
||||
public static void shutdown(final MasterThread masterThread,
|
||||
final List<RegionServerThread> regionServerThreads) {
|
||||
LOG.info("Shutting down HBase Cluster");
|
||||
/** This is not needed. Remove.
|
||||
for(RegionServerThread hsr: regionServerThreads) {
|
||||
hsr.getRegionServer().stop();
|
||||
}
|
||||
*/
|
||||
if(masterThread != null) {
|
||||
masterThread.getMaster().shutdown();
|
||||
}
|
||||
// regionServerThreads can never be null because they are initialized when
|
||||
// the class is constructed.
|
||||
synchronized(regionServerThreads) {
|
||||
for(Thread t: regionServerThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (masterThread != null) {
|
||||
try {
|
||||
masterThread.join();
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
LOG.info("Shutdown " +
|
||||
((masterThread != null)? masterThread.getName(): "0 masters") + " " +
|
||||
regionServerThreads.size() + " region server(s)");
|
||||
this.hbaseCluster.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the mini HBase cluster
|
||||
*/
|
||||
public void shutdown() {
|
||||
MiniHBaseCluster.shutdown(this.masterThread, this.regionThreads);
|
||||
this.hbaseCluster.shutdown();
|
||||
|
||||
try {
|
||||
if (shutdownDFS && cluster != null) {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
LOG.info("Shutting down Mini DFS cluster");
|
||||
LOG.info("Shutting down Mini DFS ");
|
||||
cluster.shutdown();
|
||||
|
||||
if (fs != null) {
|
||||
@ -454,13 +267,18 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* @throws IOException
|
||||
*/
|
||||
void flushcache() throws IOException {
|
||||
HRegionServer s = this.regionThreads.get(0).getRegionServer();
|
||||
for(HRegion r: s.onlineRegions.values() ) {
|
||||
r.flushcache(false);
|
||||
for (LocalHBaseCluster.RegionServerThread t:
|
||||
this.hbaseCluster.getRegionServers()) {
|
||||
for(HRegion r: t.getRegionServer().onlineRegions.values() ) {
|
||||
r.flushcache(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<RegionServerThread> getRegionThreads() {
|
||||
return this.regionThreads;
|
||||
/**
|
||||
* @return List of region server threads.
|
||||
*/
|
||||
public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
|
||||
return this.hbaseCluster.getRegionServers();
|
||||
}
|
||||
}
|
||||
|
@ -302,12 +302,12 @@ public class MultiRegionTable extends HBaseTestCase {
|
||||
final HRegionInfo r) throws IOException {
|
||||
|
||||
LOG.info("Starting compaction");
|
||||
for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) {
|
||||
for (LocalHBaseCluster.RegionServerThread thread:
|
||||
cluster.getRegionThreads()) {
|
||||
SortedMap<Text, HRegion> regions = thread.getRegionServer().onlineRegions;
|
||||
|
||||
// Retry if ConcurrentModification... alternative of sync'ing is not
|
||||
// worth it for sake of unit test.
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
for (HRegion online: regions.values()) {
|
||||
|
@ -54,8 +54,9 @@ public class OOMEHMaster extends HMaster {
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws IOException {
|
||||
doMain(args, OOMEHMaster.class);
|
||||
}
|
||||
}
|
@ -53,7 +53,7 @@ public class TestInfoServers extends HBaseTestCase {
|
||||
a.createTable(new HTableDescriptor(getName()));
|
||||
assertTrue(a.tableExists(new Text(getName())));
|
||||
try {
|
||||
int port = miniHbase.getMasterThread().getMaster().infoServer.getPort();
|
||||
int port = miniHbase.getMaster().infoServer.getPort();
|
||||
assertHasExpectedContent(new URL("http://localhost:" + port +
|
||||
"/index.html"), "Master");
|
||||
port = miniHbase.getRegionThreads().get(0).getRegionServer().
|
||||
|
@ -129,7 +129,8 @@ public class TestLogRolling extends HBaseTestCase {
|
||||
// continue
|
||||
}
|
||||
|
||||
this.logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
|
||||
this.logdir =
|
||||
cluster.getRegionThreads().get(0).getRegionServer().getLog().dir;
|
||||
|
||||
// When the META table can be opened, the region servers are running
|
||||
@SuppressWarnings("unused")
|
||||
|
Loading…
x
Reference in New Issue
Block a user