HBASE-1145 Ensure that there is only 1 Master with Zookeeper
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@755725 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
351e1ad689
commit
c3a68eaf4c
|
@ -7,6 +7,8 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1146 Replace the HRS leases with Zookeeper
|
||||
HBASE-61 Create an HBase-specific MapFile implementation
|
||||
(Ryan Rawson via Stack)
|
||||
HBASE-1145 Ensure that there is only 1 Master with Zookeeper (Removes
|
||||
hbase.master) (Nitay Joffe via Stack)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-1140 "ant clean test" fails (Nitay Joffe via Stack)
|
||||
|
@ -100,6 +102,7 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1265 HLogEdit static constants should be final (Nitay Joffe via Stack)
|
||||
HBASE-1244 ZooKeeperWrapper constants cleanup (Nitay Joffe via Stack)
|
||||
|
||||
|
||||
Release 0.19.0 - 01/21/2009
|
||||
INCOMPATIBLE CHANGES
|
||||
HBASE-885 TableMap and TableReduce should be interfaces
|
||||
|
|
69
bin/hbase
69
bin/hbase
|
@ -182,41 +182,38 @@ fi
|
|||
# restore ordinary behaviour
|
||||
unset IFS
|
||||
|
||||
# We kill the ZK instance using a hard coded port, to be changed
|
||||
if [ "$COMMAND" = "zookeeper" ] && [ "$@" = "start" ] ; then
|
||||
exec "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg
|
||||
# figure out which class to run
|
||||
if [ "$COMMAND" = "shell" ] ; then
|
||||
CLASS="org.jruby.Main ${HBASE_HOME}/bin/hirb.rb"
|
||||
elif [ "$COMMAND" = "master" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.master.HMaster'
|
||||
elif [ "$COMMAND" = "regionserver" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.regionserver.HRegionServer'
|
||||
elif [ "$COMMAND" = "rest" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.rest.Dispatcher'
|
||||
elif [ "$COMMAND" = "thrift" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.thrift.ThriftServer'
|
||||
elif [ "$COMMAND" = "migrate" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.util.Migrate'
|
||||
elif [ "$COMMAND" = "zookeeper" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.zookeeper.HQuorumPeer'
|
||||
else
|
||||
# figure out which class to run
|
||||
if [ "$COMMAND" = "shell" ] ; then
|
||||
CLASS="org.jruby.Main ${HBASE_HOME}/bin/hirb.rb"
|
||||
elif [ "$COMMAND" = "master" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.master.HMaster'
|
||||
elif [ "$COMMAND" = "regionserver" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.regionserver.HRegionServer'
|
||||
elif [ "$COMMAND" = "rest" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.rest.Dispatcher'
|
||||
elif [ "$COMMAND" = "thrift" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.thrift.ThriftServer'
|
||||
elif [ "$COMMAND" = "migrate" ] ; then
|
||||
CLASS='org.apache.hadoop.hbase.util.Migrate'
|
||||
else
|
||||
CLASS=$COMMAND
|
||||
fi
|
||||
|
||||
# Have JVM dump heap if we run out of memory. Files will be 'launch directory'
|
||||
# and are named like the following: java_pid21612.hprof. Apparently it doesn't
|
||||
# 'cost' to have this flag enabled. Its a 1.6 flag only. See:
|
||||
# http://blogs.sun.com/alanb/entry/outofmemoryerror_looks_a_bit_better
|
||||
HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.log.dir=$HBASE_LOG_DIR"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.log.file=$HBASE_LOGFILE"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.home.dir=$HBASE_HOME"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.id.str=$HBASE_IDENT_STRING"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.root.logger=${HBASE_ROOT_LOGGER:-INFO,console}"
|
||||
if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
|
||||
HBASE_OPTS="$HBASE_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
|
||||
fi
|
||||
|
||||
# run it
|
||||
exec "$JAVA" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@"
|
||||
CLASS=$COMMAND
|
||||
fi
|
||||
|
||||
# Have JVM dump heap if we run out of memory. Files will be 'launch directory'
|
||||
# and are named like the following: java_pid21612.hprof. Apparently it doesn't
|
||||
# 'cost' to have this flag enabled. Its a 1.6 flag only. See:
|
||||
# http://blogs.sun.com/alanb/entry/outofmemoryerror_looks_a_bit_better
|
||||
HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.log.dir=$HBASE_LOG_DIR"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.log.file=$HBASE_LOGFILE"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.home.dir=$HBASE_HOME"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.id.str=$HBASE_IDENT_STRING"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhbase.root.logger=${HBASE_ROOT_LOGGER:-INFO,console}"
|
||||
if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
|
||||
HBASE_OPTS="$HBASE_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
|
||||
fi
|
||||
|
||||
# run it
|
||||
exec "$JAVA" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@"
|
||||
|
|
|
@ -31,13 +31,27 @@
|
|||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master</name>
|
||||
<value>local</value>
|
||||
<description>The host and port that the HBase master runs at.
|
||||
A value of 'local' runs the master and a regionserver in
|
||||
a single process.
|
||||
<name>run.distributed</name>
|
||||
<value>false</value>
|
||||
<description>Enable this to run a distributed HBase. When this is disabled
|
||||
(the default) HBase runs all processes on the current host in a local mode.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.hostname</name>
|
||||
<value>localhost</value>
|
||||
<description>The host that the HBase master runs at.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.port</name>
|
||||
<value>60000</value>
|
||||
<description>The port master should bind to.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.tmp.dir</name>
|
||||
<value>/tmp/hbase-${user.name}</value>
|
||||
<description>Temporary directory on the local filesystem.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.info.port</name>
|
||||
<value>60010</value>
|
||||
|
|
|
@ -7,6 +7,8 @@ initLimit=10
|
|||
# sending a request and getting an acknowledgement
|
||||
syncLimit=5
|
||||
# the directory where the snapshot is stored.
|
||||
dataDir=/tmp/zookeeper
|
||||
dataDir=${hbase.tmp.dir}/zookeeper
|
||||
# the port at which the clients will connect
|
||||
clientPort=2181
|
||||
|
||||
server.0=${hbase.master.hostname}:2888:3888
|
|
@ -89,13 +89,10 @@ public class LocalHBaseCluster implements HConstants {
|
|||
public LocalHBaseCluster(final HBaseConfiguration conf,
|
||||
final int noRegionServers)
|
||||
throws IOException {
|
||||
super();
|
||||
this.conf = conf;
|
||||
doLocal(conf);
|
||||
// Create the master
|
||||
this.master = new HMaster(conf);
|
||||
// Set the master's port for the HRegionServers
|
||||
conf.set(MASTER_ADDRESS, this.master.getMasterAddress().toString());
|
||||
// Start the HRegionServers. Always have region servers come up on
|
||||
// port '0' so there won't be clashes over default port as unit tests
|
||||
// start/stop ports at different times during the life of the test.
|
||||
|
@ -330,10 +327,11 @@ public class LocalHBaseCluster implements HConstants {
|
|||
if (!isLocal(c)) {
|
||||
return c;
|
||||
}
|
||||
// Need to rewrite address in Configuration if not done already.
|
||||
// Need to rewrite address in Configuration if not done already. This is
|
||||
// for the case when we're using the deprecated master.address property.
|
||||
String address = c.get(MASTER_ADDRESS);
|
||||
if (address == null) {
|
||||
throw new NullPointerException("Address is null for " + MASTER_ADDRESS);
|
||||
return c;
|
||||
}
|
||||
String port = address.startsWith(LOCAL_COLON)?
|
||||
address.substring(LOCAL_COLON.length()):
|
||||
|
@ -341,15 +339,17 @@ public class LocalHBaseCluster implements HConstants {
|
|||
c.set(MASTER_ADDRESS, "localhost:" + port);
|
||||
return c;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param c Configuration to check.
|
||||
* @return True if a 'local' address in hbase.master value.
|
||||
*/
|
||||
public static boolean isLocal(final Configuration c) {
|
||||
String address = c.get(MASTER_ADDRESS);
|
||||
return address == null || address.equals(LOCAL) ||
|
||||
address.startsWith(LOCAL_COLON);
|
||||
boolean addressIsLocal = address == null || address.equals(LOCAL) ||
|
||||
address.startsWith(LOCAL_COLON);
|
||||
boolean distributedOff = !c.getBoolean("run.distributed", false);
|
||||
return addressIsLocal && distributedOff;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -38,4 +38,12 @@ public class MasterNotRunningException extends IOException {
|
|||
public MasterNotRunningException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor taking another exception.
|
||||
* @param e Exception to grab data from.
|
||||
*/
|
||||
public MasterNotRunningException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -198,6 +198,12 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
|
||||
public HMasterInterface getMaster() throws MasterNotRunningException {
|
||||
try {
|
||||
getZooKeeperWrapper();
|
||||
} catch (IOException e) {
|
||||
throw new MasterNotRunningException(e);
|
||||
}
|
||||
|
||||
HServerAddress masterLocation = null;
|
||||
synchronized (this.masterLock) {
|
||||
for (int tries = 0;
|
||||
|
@ -205,10 +211,10 @@ public class HConnectionManager implements HConstants {
|
|||
!this.masterChecked && this.master == null &&
|
||||
tries < numRetries;
|
||||
tries++) {
|
||||
|
||||
masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS,
|
||||
DEFAULT_MASTER_ADDRESS));
|
||||
|
||||
try {
|
||||
masterLocation = zooKeeperWrapper.readMasterAddressOrThrow();
|
||||
|
||||
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
|
||||
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
|
||||
masterLocation.getInetSocketAddress(), this.conf);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.RuntimeMXBean;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -92,7 +93,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
*/
|
||||
public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||
HMasterRegionInterface {
|
||||
|
||||
|
||||
static final Log LOG = LogFactory.getLog(HMaster.class.getName());
|
||||
|
||||
public long getProtocolVersion(@SuppressWarnings("unused") String protocol,
|
||||
|
@ -116,6 +117,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
final long maxRegionOpenTime;
|
||||
final int leaseTimeout;
|
||||
private final ZooKeeperWrapper zooKeeperWrapper;
|
||||
private final ZKMasterAddressWatcher zkMasterAddressWatcher;
|
||||
|
||||
volatile DelayQueue<RegionServerOperation> delayedToDoQueue =
|
||||
new DelayQueue<RegionServerOperation>();
|
||||
|
@ -148,39 +150,38 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
RegionManager regionManager;
|
||||
|
||||
private MasterMetrics metrics;
|
||||
|
||||
/** Build the HMaster out of a raw configuration item.
|
||||
*
|
||||
* @param conf - Configuration object
|
||||
* @throws IOException
|
||||
*/
|
||||
public HMaster(HBaseConfiguration conf) throws IOException {
|
||||
this(new Path(conf.get(HBASE_DIR)),
|
||||
new HServerAddress(conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)),
|
||||
conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the HMaster
|
||||
* @param rd base directory of this HBase instance. Must be fully
|
||||
* qualified so includes filesystem to use.
|
||||
* @param address server address and port number
|
||||
* Build the HMaster out of a raw configuration item.
|
||||
* @param conf configuration
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public HMaster(Path rd, HServerAddress address, HBaseConfiguration conf)
|
||||
throws IOException {
|
||||
public HMaster(HBaseConfiguration conf) throws IOException {
|
||||
// find out our address. If it's set in config, use that, otherwise look it
|
||||
// up in DNS.
|
||||
String addressStr = conf.get(MASTER_ADDRESS);
|
||||
if (addressStr == null) {
|
||||
addressStr = conf.get("hbase.master.hostname");
|
||||
if (addressStr == null) {
|
||||
addressStr = InetAddress.getLocalHost().getCanonicalHostName();
|
||||
}
|
||||
addressStr += ":";
|
||||
addressStr += conf.get("hbase.master.port", Integer.toString(DEFAULT_MASTER_PORT));
|
||||
}
|
||||
HServerAddress address = new HServerAddress(addressStr);
|
||||
LOG.info("My address is " + address);
|
||||
|
||||
this.conf = conf;
|
||||
this.rootdir = new Path(conf.get(HBASE_DIR));
|
||||
try {
|
||||
FSUtils.validateRootPath(rd);
|
||||
FSUtils.validateRootPath(this.rootdir);
|
||||
} catch (IOException e) {
|
||||
LOG.fatal("Not starting HMaster because the root directory path '" +
|
||||
rd.toString() + "' is not valid. Check the setting of the" +
|
||||
this.rootdir + "' is not valid. Check the setting of the" +
|
||||
" configuration parameter '" + HBASE_DIR + "'", e);
|
||||
throw e;
|
||||
}
|
||||
this.rootdir = rd;
|
||||
this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
// The filesystem hbase wants to use is probably not what is set into
|
||||
// fs.default.name; its value is probably the default.
|
||||
|
@ -233,7 +234,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
|
||||
// The rpc-server port can be ephemeral... ensure we have the correct info
|
||||
this.address = new HServerAddress(server.getListenerAddress());
|
||||
conf.set(MASTER_ADDRESS, address.toString());
|
||||
|
||||
this.connection = ServerConnectionManager.getConnection(conf);
|
||||
|
||||
|
@ -243,14 +243,26 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
|
||||
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
zkMasterAddressWatcher = new ZKMasterAddressWatcher(zooKeeperWrapper);
|
||||
serverManager = new ServerManager(this);
|
||||
regionManager = new RegionManager(this);
|
||||
|
||||
writeAddressToZooKeeper();
|
||||
|
||||
// We're almost open for business
|
||||
this.closed.set(false);
|
||||
LOG.info("HMaster initialized on " + this.address.toString());
|
||||
}
|
||||
|
||||
private void writeAddressToZooKeeper() {
|
||||
while (true) {
|
||||
zkMasterAddressWatcher.waitForMasterAddressAvailability();
|
||||
if (zooKeeperWrapper.writeMasterAddress(address)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void bootstrap() throws IOException {
|
||||
LOG.info("BOOTSTRAP: creating ROOT and first META regions");
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
|
||||
/**
|
||||
* ZooKeeper watcher for the master address. Used by the HMaster to wait for
|
||||
* the event when master address ZNode gets deleted. When multiple masters are
|
||||
* brought up, they race to become master by writing to write their address to
|
||||
* ZooKeeper. Whoever wins becomes the master, and the rest wait for that
|
||||
* ephemeral node in ZooKeeper to get deleted (meaning the master went down), at
|
||||
* which point they try to write to it again.
|
||||
*/
|
||||
public class ZKMasterAddressWatcher implements Watcher {
|
||||
private static final Log LOG = LogFactory.getLog(ZKMasterAddressWatcher.class);
|
||||
|
||||
private final ZooKeeperWrapper zooKeeper;
|
||||
|
||||
/**
|
||||
* Create a watcher with a ZooKeeperWrapper instance.
|
||||
* @param zooKeeper ZooKeeperWrapper to use to talk to ZooKeeper.
|
||||
*/
|
||||
public ZKMasterAddressWatcher(ZooKeeperWrapper zooKeeper) {
|
||||
this.zooKeeper = zooKeeper;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
|
||||
*/
|
||||
@Override
|
||||
public synchronized void process(WatchedEvent event) {
|
||||
EventType type = event.getType();
|
||||
if (type.equals(EventType.NodeDeleted)) {
|
||||
LOG.debug("Master address ZNode deleted, notifying waiting masters");
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for master address to be available. This sets a watch in ZooKeeper and
|
||||
* blocks until the master address ZNode gets deleted.
|
||||
*/
|
||||
public synchronized void waitForMasterAddressAvailability() {
|
||||
while (zooKeeper.readMasterAddress(this) != null) {
|
||||
try {
|
||||
LOG.debug("Waiting for master address ZNode to be deleted");
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1128,9 +1128,22 @@ public class HRegionServer implements HConstants, HRegionInterface, HBaseRPCErro
|
|||
* Run initialization using parameters passed us by the master.
|
||||
*/
|
||||
private MapWritable reportForDuty(final Sleeper sleeper) {
|
||||
HServerAddress masterAddress = null;
|
||||
while (masterAddress == null) {
|
||||
if (stopRequested.get()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
masterAddress = zooKeeperWrapper.readMasterAddressOrThrow();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to read master address from ZooKeeper. Retrying." +
|
||||
" Error was:", e);
|
||||
sleeper.sleep();
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Telling master at " +
|
||||
conf.get(MASTER_ADDRESS) + " that we are up");
|
||||
LOG.debug("Telling master at " + masterAddress + " that we are up");
|
||||
}
|
||||
HMasterRegionInterface master = null;
|
||||
while (!stopRequested.get() && master == null) {
|
||||
|
@ -1139,7 +1152,7 @@ public class HRegionServer implements HConstants, HRegionInterface, HBaseRPCErro
|
|||
// should retry indefinitely.
|
||||
master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
|
||||
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
|
||||
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
||||
masterAddress.getInetSocketAddress(),
|
||||
this.conf, -1);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.zookeeper.server.ServerConfig;
|
||||
import org.apache.zookeeper.server.ZooKeeperServerMain;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
|
||||
|
||||
/**
|
||||
* HBase's version of ZooKeeper's QuorumPeer. When HBase is set to manage
|
||||
* ZooKeeper, this class is used to start up QuorumPeer instances. By doing
|
||||
* things in here rather than directly calling to ZooKeeper, we have more
|
||||
* control over the process. Currently, this class allows us to parse the
|
||||
* zoo.cfg and inject variables from HBase's site.xml configuration in.
|
||||
*/
|
||||
public class HQuorumPeer implements HConstants {
|
||||
private static final Log LOG = LogFactory.getLog(HQuorumPeer.class);
|
||||
private static final String VARIABLE_START = "${";
|
||||
private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
|
||||
private static final String VARIABLE_END = "}";
|
||||
private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
|
||||
|
||||
/**
|
||||
* Parse ZooKeeper configuration and run a QuorumPeer.
|
||||
* While parsing the zoo.cfg, we substitute variables with values from
|
||||
* hbase-site.xml.
|
||||
* @param args String[] of command line arguments. Not used.
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
parseConfig();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
System.exit(-1);
|
||||
}
|
||||
if (ServerConfig.isStandalone()) {
|
||||
ZooKeeperServerMain.main(args);
|
||||
} else {
|
||||
QuorumPeerMain.runPeerFromConfig();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse zoo.cfg, injecting HBase Configuration variables in.
|
||||
* @throws Exception if anything goes wrong parsing config
|
||||
*/
|
||||
public static void parseConfig() throws Exception {
|
||||
ClassLoader cl = HQuorumPeer.class.getClassLoader();
|
||||
InputStream inputStream = cl.getResourceAsStream(ZOOKEEPER_CONFIG_NAME);
|
||||
parseConfig(inputStream);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a separate method from parseConfig() so that we can test by passing
|
||||
* in our own InputStreams rather than reading directly from zoo.cfg.
|
||||
* Parse zoo.cfg, injecting HBase Configuration variables in.
|
||||
* @param inputStream InputStream to parse.
|
||||
* @throws Exception if anything goes wrong parsing config
|
||||
*/
|
||||
public static void parseConfig(InputStream inputStream) throws Exception {
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
Properties properties = new Properties();
|
||||
try {
|
||||
properties.load(inputStream);
|
||||
} catch (IOException e) {
|
||||
String msg = "fail to read properties from " + ZOOKEEPER_CONFIG_NAME;
|
||||
LOG.fatal(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
for (Entry<Object, Object> entry : properties.entrySet()) {
|
||||
String value = entry.getValue().toString().trim();
|
||||
StringBuilder newValue = new StringBuilder();
|
||||
int varStart = value.indexOf(VARIABLE_START);
|
||||
int varEnd = 0;
|
||||
while (varStart != -1) {
|
||||
varEnd = value.indexOf(VARIABLE_END, varStart);
|
||||
if (varEnd == -1) {
|
||||
String msg = "variable at " + varStart + " has no end marker";
|
||||
LOG.fatal(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
|
||||
|
||||
String substituteValue = System.getProperty(variable);
|
||||
if (substituteValue == null) {
|
||||
substituteValue = conf.get(variable);
|
||||
}
|
||||
if (substituteValue == null) {
|
||||
String msg = "variable " + variable + " not set in system property "
|
||||
+ "or hbase configs";
|
||||
LOG.fatal(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
newValue.append(substituteValue);
|
||||
|
||||
varEnd += VARIABLE_END_LENGTH;
|
||||
varStart = value.indexOf(VARIABLE_START, varEnd);
|
||||
}
|
||||
|
||||
newValue.append(value.substring(varEnd));
|
||||
|
||||
String key = entry.getKey().toString().trim();
|
||||
properties.setProperty(key, newValue.toString());
|
||||
}
|
||||
|
||||
QuorumPeerConfig.parseProperties(properties);
|
||||
}
|
||||
}
|
|
@ -65,14 +65,14 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
private final String rootRegionZNode;
|
||||
private final String outOfSafeModeZNode;
|
||||
private final String rsZNode;
|
||||
private final String masterElectionZNode;
|
||||
|
||||
/**
|
||||
* Create a ZooKeeperWrapper.
|
||||
* @param conf HBaseConfiguration to read settings from.
|
||||
* @throws IOException If a connection error occurs.
|
||||
*/
|
||||
public ZooKeeperWrapper(HBaseConfiguration conf)
|
||||
throws IOException {
|
||||
public ZooKeeperWrapper(HBaseConfiguration conf) throws IOException {
|
||||
this(conf, null);
|
||||
}
|
||||
|
||||
|
@ -105,10 +105,13 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
String outOfSafeModeZNodeName = conf.get("zookeeper.znode.safemode",
|
||||
"safe-mode");
|
||||
String rsZNodeName = conf.get("zookeeper.znode.rs", "rs");
|
||||
String masterAddressZNodeName = conf.get("zookeeper.znode.master",
|
||||
"master");
|
||||
|
||||
rootRegionZNode = getZNode(parentZNode, rootServerZNodeName);
|
||||
outOfSafeModeZNode = getZNode(parentZNode, outOfSafeModeZNodeName);
|
||||
rsZNode = getZNode(parentZNode, rsZNodeName);
|
||||
masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,16 +160,10 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
return;
|
||||
}
|
||||
|
||||
// If no server.X lines exist, then we're using a single instance ZooKeeper
|
||||
// on the master node.
|
||||
if (servers.isEmpty()) {
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
String masterAddress = conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS);
|
||||
String masterHost = "localhost";
|
||||
if (!masterAddress.equals("local")) {
|
||||
masterHost = masterAddress.substring(0, masterAddress.indexOf(':'));
|
||||
}
|
||||
servers.add(masterHost);
|
||||
LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
|
||||
"ZooKeeper cluster configured for its operation.");
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
StringBuilder hostPortBuilder = new StringBuilder();
|
||||
|
@ -195,17 +192,48 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
* there was a problem reading the ZNode.
|
||||
*/
|
||||
public HServerAddress readRootRegionLocation() {
|
||||
return readAddress(rootRegionZNode, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read address of master server.
|
||||
* @return HServerAddress of master server.
|
||||
* @throws IOException if there's a problem reading the ZNode.
|
||||
*/
|
||||
public HServerAddress readMasterAddressOrThrow() throws IOException {
|
||||
return readAddressOrThrow(masterElectionZNode, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read master address and set a watch on it.
|
||||
* @param watcher Watcher to set on master address ZNode if not null.
|
||||
* @return HServerAddress of master or null if there was a problem reading the
|
||||
* ZNode. The watcher is set only if the result is not null.
|
||||
*/
|
||||
public HServerAddress readMasterAddress(Watcher watcher) {
|
||||
return readAddress(masterElectionZNode, watcher);
|
||||
}
|
||||
|
||||
private HServerAddress readAddress(String znode, Watcher watcher) {
|
||||
try {
|
||||
return readAddressOrThrow(znode, watcher);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private HServerAddress readAddressOrThrow(String znode, Watcher watcher) throws IOException {
|
||||
byte[] data;
|
||||
try {
|
||||
data = zooKeeper.getData(rootRegionZNode, false, null);
|
||||
data = zooKeeper.getData(znode, watcher, null);
|
||||
} catch (InterruptedException e) {
|
||||
return null;
|
||||
throw new IOException(e);
|
||||
} catch (KeeperException e) {
|
||||
return null;
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
String addressString = Bytes.toString(data);
|
||||
LOG.debug("Read ZNode " + rootRegionZNode + " got " + addressString);
|
||||
LOG.debug("Read ZNode " + znode + " got " + addressString);
|
||||
HServerAddress address = new HServerAddress(addressString);
|
||||
return address;
|
||||
}
|
||||
|
@ -315,6 +343,26 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
return createRootRegionLocation(addressString);
|
||||
}
|
||||
|
||||
public boolean writeMasterAddress(HServerAddress address) {
|
||||
if (!ensureParentExists(masterElectionZNode)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String addressStr = address.toString();
|
||||
byte[] data = Bytes.toBytes(addressStr);
|
||||
try {
|
||||
zooKeeper.create(masterElectionZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
||||
LOG.debug("Wrote master address " + address + " to ZooKeeper");
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to write master address " + address + " to ZooKeeper", e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to write master address " + address + " to ZooKeeper", e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we're out of safe mode. Being out of safe mode is signified by an
|
||||
* ephemeral ZNode existing in ZooKeeper.
|
||||
|
|
|
@ -42,11 +42,6 @@ public class OOMEHMaster extends HMaster {
|
|||
public OOMEHMaster(HBaseConfiguration conf) throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public OOMEHMaster(Path dir, HServerAddress address, HBaseConfiguration conf)
|
||||
throws IOException {
|
||||
super(dir, address, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg[] msgs,
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.zookeeper.server.ServerConfig;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
||||
|
||||
/**
|
||||
* Test for HQuorumPeer.
|
||||
*/
|
||||
public class HQuorumPeerTest extends HBaseTestCase {
|
||||
/** @throws Exception */
|
||||
public void testConfigInjection() throws Exception {
|
||||
String s =
|
||||
"tickTime=2000\n" +
|
||||
"initLimit=10\n" +
|
||||
"syncLimit=5\n" +
|
||||
"dataDir=${hbase.tmp.dir}/zookeeper\n" +
|
||||
"clientPort=2181\n" +
|
||||
"server.0=${hbase.master.hostname}:2888:3888\n";
|
||||
|
||||
InputStream is = new ByteArrayInputStream(s.getBytes());
|
||||
HQuorumPeer.parseConfig(is);
|
||||
|
||||
int tickTime = QuorumPeerConfig.getTickTime();
|
||||
assertEquals(2000, tickTime);
|
||||
int initLimit = QuorumPeerConfig.getInitLimit();
|
||||
assertEquals(10, initLimit);
|
||||
int syncLimit = QuorumPeerConfig.getSyncLimit();
|
||||
assertEquals(5, syncLimit);
|
||||
String userName = System.getProperty("user.name");
|
||||
String dataDir = "/tmp/hbase-" + userName + "/zookeeper";
|
||||
assertEquals(dataDir, ServerConfig.getDataDir());
|
||||
assertEquals(2181, ServerConfig.getClientPort());
|
||||
Map<Long,QuorumServer> servers = QuorumPeerConfig.getServers();
|
||||
assertEquals(1, servers.size());
|
||||
assertTrue(servers.containsKey(Long.valueOf(0)));
|
||||
QuorumServer server = servers.get(Long.valueOf(0));
|
||||
assertEquals("localhost", server.addr.getHostName());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue