HBASE-1144 Store the ROOT region location in Zookeeper
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@739112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
86e3267abb
commit
f591f1dba6
|
@ -2,6 +2,8 @@ HBase Change Log
|
|||
Release 0.20.0 - Unreleased
|
||||
INCOMPATIBLE CHANGES
|
||||
HBASE-1147 Modify the scripts to use Zookeeper
|
||||
HBASE-1144 Store the ROOT region location in Zookeeper
|
||||
(Nitay Joffe via Stack)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-1140 "ant clean test" fails (Nitay Joffe via Stack)
|
||||
|
|
|
@ -78,6 +78,39 @@ public interface HConstants {
|
|||
/** default port for master web api */
|
||||
static final int DEFAULT_MASTER_INFOPORT = 60010;
|
||||
|
||||
/** Name of ZooKeeper config file in conf/ directory. */
|
||||
static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
|
||||
|
||||
/** Parameter name for ZooKeeper session timeout (in milliseconds). */
|
||||
static final String ZOOKEEPER_SESSION_TIMEOUT = "zookeeper.session.timeout";
|
||||
/** Default ZooKeeper session timeout. In milliseconds. */
|
||||
static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 10 * 1000;
|
||||
|
||||
/** Parameter name for number of times to retry writes to ZooKeeper. */
|
||||
static final String ZOOKEEPER_RETRIES = "zookeeper.retries";
|
||||
/** Default number of times to retry writes to ZooKeeper. */
|
||||
static final int DEFAULT_ZOOKEEPER_RETRIES = 5;
|
||||
|
||||
/** Parameter name for ZooKeeper pause between retries. In milliseconds. */
|
||||
static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
|
||||
/** Default ZooKeeper pause value. In milliseconds. */
|
||||
static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
|
||||
|
||||
/** Parameter name for HBase parent ZNode in ZooKeeper. */
|
||||
static final String ZOOKEEPER_PARENT_ZNODE = "zookeeper.znode.parent";
|
||||
/** Default HBase parent ZNode in ZooKeeper. */
|
||||
static final String DEFAULT_ZOOKEEPER_PARENT_ZNODE = "/hbase";
|
||||
|
||||
/** Parameter name for ZooKeeper ZNode storing root server location. */
|
||||
static final String ZOOKEEPER_ROOT_SERVER_ZNODE = "zookeeper.znode.rootserver";
|
||||
/** Default ZooKeeper ZNode storing root server location. */
|
||||
static final String DEFAULT_ZOOKEEPER_ROOT_SERVER_ZNODE = "root-region-server";
|
||||
|
||||
/** Parameter name for ZooKeeper ZNode storing safe mode. */
|
||||
static final String ZOOKEEPER_SAFE_MODE_ZNODE = "zookeeper.znode.safemode";
|
||||
/** Default ZooKeeper ZNode storing safe mode. */
|
||||
static final String DEFAULT_ZOOKEEPER_SAFE_MODE_ZNODE = "safe-mode";
|
||||
|
||||
/** Parameter name for hbase.regionserver address. */
|
||||
static final String REGIONSERVER_ADDRESS = "hbase.regionserver";
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.MetaUtils;
|
||||
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
|
@ -140,7 +141,9 @@ public class HConnectionManager implements HConstants {
|
|||
private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
|
||||
cachedRegionLocations =
|
||||
new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
|
||||
|
||||
|
||||
private ZooKeeperWrapper zooKeeperWrapper;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
* @param conf Configuration object
|
||||
|
@ -751,6 +754,13 @@ public class HConnectionManager implements HConstants {
|
|||
return server;
|
||||
}
|
||||
|
||||
private synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
|
||||
if (zooKeeperWrapper == null) {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
}
|
||||
return zooKeeperWrapper;
|
||||
}
|
||||
|
||||
/*
|
||||
* Repeatedly try to find the root region by asking the master for where it is
|
||||
* @return HRegionLocation for root region if found
|
||||
|
@ -761,12 +771,22 @@ public class HConnectionManager implements HConstants {
|
|||
private HRegionLocation locateRootRegion()
|
||||
throws IOException {
|
||||
getMaster();
|
||||
|
||||
// We lazily instantiate the ZooKeeper object because we don't want to
|
||||
// make the constructor have to throw IOException or handle it itself.
|
||||
ZooKeeperWrapper zooKeeperWrapper = getZooKeeperWrapper();
|
||||
|
||||
HServerAddress rootRegionAddress = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
int localTimeouts = 0;
|
||||
// ask the master which server has the root region
|
||||
while (rootRegionAddress == null && localTimeouts < numRetries) {
|
||||
rootRegionAddress = master.findRootRegion();
|
||||
// Don't read root region until we're out of safe mode so we know
|
||||
// that the meta regions have been assigned.
|
||||
boolean outOfSafeMode = zooKeeperWrapper.checkOutOfSafeMode();
|
||||
if (outOfSafeMode) {
|
||||
rootRegionAddress = zooKeeperWrapper.readRootRegionLocation();
|
||||
}
|
||||
if (rootRegionAddress == null) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -65,7 +65,10 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol {
|
|||
* <li>Version 13: HBASE-847</li>
|
||||
* <li>Version 14: HBASE-900</li>
|
||||
* <li>Version 15: HRegionInterface.exists</li>
|
||||
* <li>Version 16: Removed HMasterRegionInterface.getRootRegionLocation and
|
||||
* HMasterInterface.findRootRegion. We use ZooKeeper to store root region
|
||||
* location instead.</li>
|
||||
* </ul>
|
||||
*/
|
||||
public static final long versionID = 15L;
|
||||
public static final long versionID = 16L;
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.ipc;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
|
@ -117,10 +116,4 @@ public interface HMasterInterface extends HBaseRPCProtocolVersion {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void shutdown() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the location of the root region
|
||||
* @return address of server that serves the root region
|
||||
*/
|
||||
public HServerAddress findRootRegion();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.io.MapWritable;
|
|||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
|
||||
/**
|
||||
* HRegionServers interact with the HMasterRegionInterface to report on local
|
||||
|
@ -62,11 +61,4 @@ public interface HMasterRegionInterface extends HBaseRPCProtocolVersion {
|
|||
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[],
|
||||
HRegionInfo mostLoadedRegions[])
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* @return Root region region server address. Unlike
|
||||
* HMasterInterface.findRootRegion, does not wait until all regions are
|
||||
* assigned.
|
||||
*/
|
||||
public HServerAddress getRootRegionLocation();
|
||||
}
|
|
@ -408,7 +408,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
*/
|
||||
private boolean processToDoQueue() {
|
||||
RegionServerOperation op = null;
|
||||
|
||||
|
||||
// block until the root region is online
|
||||
if (regionManager.getRootRegionLocation() != null) {
|
||||
// We can't process server shutdowns unless the root region is online
|
||||
|
@ -858,14 +858,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
}
|
||||
}
|
||||
|
||||
public HServerAddress findRootRegion() {
|
||||
HServerAddress rootServer = null;
|
||||
if (!regionManager.inSafeMode()) {
|
||||
rootServer = regionManager.getRootRegionLocation();
|
||||
}
|
||||
return rootServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Server metrics
|
||||
*/
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
|
||||
/**
|
||||
* Class to manage assigning regions to servers, state of root and meta, etc.
|
||||
|
@ -126,23 +128,31 @@ class RegionManager implements HConstants {
|
|||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
|
||||
RegionManager(HMaster master) {
|
||||
private final ZooKeeperWrapper zooKeeperWrapper;
|
||||
private final int zooKeeperNumRetries;
|
||||
private final int zooKeeperPause;
|
||||
|
||||
RegionManager(HMaster master) throws IOException {
|
||||
HBaseConfiguration conf = master.getConfiguration();
|
||||
|
||||
this.master = master;
|
||||
this.historian = RegionHistorian.getInstance();
|
||||
this.maxAssignInOneGo = this.master.getConfiguration().
|
||||
getInt("hbase.regions.percheckin", 10);
|
||||
this.slop = this.master.getConfiguration().getFloat("hbase.regions.slop",
|
||||
(float)0.1);
|
||||
this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
|
||||
this.slop = conf.getFloat("hbase.regions.slop", (float)0.1);
|
||||
|
||||
// The root region
|
||||
rootScannerThread = new RootScanner(master);
|
||||
|
||||
// Scans the meta table
|
||||
metaScannerThread = new MetaScanner(master);
|
||||
|
||||
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
zooKeeperNumRetries = conf.getInt(ZOOKEEPER_RETRIES, DEFAULT_ZOOKEEPER_RETRIES);
|
||||
zooKeeperPause = conf.getInt(ZOOKEEPER_PAUSE, DEFAULT_ZOOKEEPER_PAUSE);
|
||||
|
||||
reassignRootRegion();
|
||||
}
|
||||
|
||||
|
||||
void start() {
|
||||
Threads.setDaemonThreadRunning(rootScannerThread,
|
||||
"RegionManager.rootScanner");
|
||||
|
@ -540,6 +550,7 @@ class RegionManager implements HConstants {
|
|||
} catch(Exception iex) {
|
||||
LOG.warn("meta scanner", iex);
|
||||
}
|
||||
zooKeeperWrapper.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -895,14 +906,30 @@ class RegionManager implements HConstants {
|
|||
public boolean isInitialMetaScanComplete() {
|
||||
return metaScannerThread.isInitialScanComplete();
|
||||
}
|
||||
|
||||
|
||||
private boolean tellZooKeeperOutOfSafeMode() {
|
||||
for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) {
|
||||
if (zooKeeperWrapper.writeOutOfSafeMode()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
sleep(attempt);
|
||||
}
|
||||
|
||||
LOG.error("Failed to tell ZooKeeper we're out of safe mode after " +
|
||||
zooKeeperNumRetries + " retries");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the initial meta scan is complete and there are no
|
||||
* unassigned or pending regions
|
||||
*/
|
||||
public boolean inSafeMode() {
|
||||
if (safeMode) {
|
||||
if(isInitialMetaScanComplete() && regionsInTransition.size() == 0) {
|
||||
if(isInitialMetaScanComplete() && regionsInTransition.size() == 0 &&
|
||||
tellZooKeeperOutOfSafeMode()) {
|
||||
master.connection.unsetRootRegionLocation();
|
||||
safeMode = false;
|
||||
LOG.info("exiting safe mode");
|
||||
|
@ -955,11 +982,46 @@ class RegionManager implements HConstants {
|
|||
numberOfMetaRegions.incrementAndGet();
|
||||
}
|
||||
|
||||
private long getPauseTime(int tries) {
|
||||
int attempt = tries;
|
||||
if (attempt >= RETRY_BACKOFF.length) {
|
||||
attempt = RETRY_BACKOFF.length - 1;
|
||||
}
|
||||
return this.zooKeeperPause * RETRY_BACKOFF[attempt];
|
||||
}
|
||||
|
||||
private void sleep(int attempt) {
|
||||
try {
|
||||
Thread.sleep(getPauseTime(attempt));
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
|
||||
private void writeRootRegionLocationToZooKeeper(HServerAddress address) {
|
||||
for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) {
|
||||
if (zooKeeperWrapper.writeRootRegionLocation(address)) {
|
||||
return;
|
||||
}
|
||||
|
||||
sleep(attempt);
|
||||
}
|
||||
|
||||
LOG.error("Failed to write root region location to ZooKeeper after " +
|
||||
zooKeeperNumRetries + " retries, shutting down");
|
||||
|
||||
this.master.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the root region location.
|
||||
* @param address Address of the region server where the root lives
|
||||
* @throws IOException If there's a problem connection to ZooKeeper.
|
||||
*/
|
||||
public void setRootRegionLocation(HServerAddress address) {
|
||||
public void setRootRegionLocation(HServerAddress address)
|
||||
throws IOException {
|
||||
writeRootRegionLocationToZooKeeper(address);
|
||||
|
||||
synchronized (rootRegionLocation) {
|
||||
rootRegionLocation.set(new HServerAddress(address));
|
||||
rootRegionLocation.notifyAll();
|
||||
|
@ -1270,4 +1332,4 @@ class RegionManager implements HConstants {
|
|||
return Bytes.compareTo(getRegionName(), o.getRegionName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.hbase.util.InfoServer;
|
||||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
@ -211,6 +212,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
final Map<String, InternalScanner> scanners =
|
||||
new ConcurrentHashMap<String, InternalScanner>();
|
||||
|
||||
private final ZooKeeperWrapper zooKeeperWrapper;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
* @param conf
|
||||
|
@ -292,6 +295,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
for(int i = 0; i < nbBlocks; i++) {
|
||||
reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
|
||||
}
|
||||
|
||||
this.zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -310,7 +315,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
for (int tries = 0; !stopRequested.get() && isHealthy();) {
|
||||
// Try to get the root region location from the master.
|
||||
if (!haveRootRegion.get()) {
|
||||
HServerAddress rootServer = hbaseMaster.getRootRegionLocation();
|
||||
HServerAddress rootServer = zooKeeperWrapper.readRootRegionLocation();
|
||||
if (rootServer != null) {
|
||||
// By setting the root region location, we bypass the wait imposed on
|
||||
// HTable for all regions being assigned.
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
|
||||
/**
|
||||
* Place-holder Watcher.
|
||||
* Does nothing currently.
|
||||
*/
|
||||
public class WatcherWrapper implements Watcher {
|
||||
private final Watcher otherWatcher;
|
||||
|
||||
/**
|
||||
* Construct with a Watcher to pass events to.
|
||||
* @param otherWatcher Watcher to pass events to.
|
||||
*/
|
||||
public WatcherWrapper(Watcher otherWatcher) {
|
||||
this.otherWatcher = otherWatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param event WatchedEvent from ZooKeeper.
|
||||
*/
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
if (otherWatcher != null) {
|
||||
otherWatcher.process(event);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,377 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooKeeper.States;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* Wraps a ZooKeeper instance and adds HBase specific functionality.
|
||||
*
|
||||
* This class provides methods to:
|
||||
* - read/write/delete the root region location in ZooKeeper.
|
||||
* - set/check out of safe mode flag.
|
||||
*/
|
||||
public class ZooKeeperWrapper implements HConstants {
|
||||
protected static final Log LOG = LogFactory.getLog(ZooKeeperWrapper.class);
|
||||
|
||||
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
|
||||
private static final String ZNODE_PATH_SEPARATOR = "/";
|
||||
|
||||
private static String quorumServers = null;
|
||||
static {
|
||||
loadZooKeeperConfig();
|
||||
}
|
||||
|
||||
private final ZooKeeper zooKeeper;
|
||||
private final WatcherWrapper watcher;
|
||||
|
||||
private final String parentZNode;
|
||||
private final String rootRegionZNode;
|
||||
private final String outOfSafeModeZNode;
|
||||
|
||||
/**
|
||||
* Create a ZooKeeperWrapper.
|
||||
* @param conf HBaseConfiguration to read settings from.
|
||||
* @throws IOException If a connection error occurs.
|
||||
*/
|
||||
public ZooKeeperWrapper(HBaseConfiguration conf)
|
||||
throws IOException {
|
||||
this(conf, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a ZooKeeperWrapper.
|
||||
* @param conf HBaseConfiguration to read settings from.
|
||||
* @param watcher ZooKeeper watcher to register.
|
||||
* @throws IOException If a connection error occurs.
|
||||
*/
|
||||
public ZooKeeperWrapper(HBaseConfiguration conf, Watcher watcher)
|
||||
throws IOException {
|
||||
if (quorumServers == null) {
|
||||
throw new IOException("Could not read quorum servers from " +
|
||||
ZOOKEEPER_CONFIG_NAME);
|
||||
}
|
||||
|
||||
int sessionTimeout = conf.getInt(ZOOKEEPER_SESSION_TIMEOUT,
|
||||
DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
|
||||
this.watcher = new WatcherWrapper(watcher);
|
||||
try {
|
||||
zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, this.watcher);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create ZooKeeper object: " + e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
parentZNode = conf.get(ZOOKEEPER_PARENT_ZNODE,
|
||||
DEFAULT_ZOOKEEPER_PARENT_ZNODE);
|
||||
|
||||
String rootServerZNodeName = conf.get(ZOOKEEPER_ROOT_SERVER_ZNODE,
|
||||
DEFAULT_ZOOKEEPER_ROOT_SERVER_ZNODE);
|
||||
if (rootServerZNodeName.startsWith(ZNODE_PATH_SEPARATOR)) {
|
||||
rootRegionZNode = rootServerZNodeName;
|
||||
} else {
|
||||
rootRegionZNode = parentZNode + ZNODE_PATH_SEPARATOR + rootServerZNodeName;
|
||||
}
|
||||
|
||||
String outOfSafeModeZNodeName = conf.get(ZOOKEEPER_SAFE_MODE_ZNODE,
|
||||
DEFAULT_ZOOKEEPER_SAFE_MODE_ZNODE);
|
||||
if (outOfSafeModeZNodeName.startsWith(ZNODE_PATH_SEPARATOR)) {
|
||||
outOfSafeModeZNode = outOfSafeModeZNodeName;
|
||||
} else {
|
||||
outOfSafeModeZNode = parentZNode + ZNODE_PATH_SEPARATOR +
|
||||
outOfSafeModeZNodeName;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for tests to directly set the ZooKeeper quorum servers.
|
||||
* @param servers comma separated host:port ZooKeeper quorum servers.
|
||||
*/
|
||||
public static void setQuorumServers(String servers) {
|
||||
quorumServers = servers;
|
||||
}
|
||||
|
||||
private static void loadZooKeeperConfig() {
|
||||
InputStream inputStream =
|
||||
ZooKeeperWrapper.class.getClassLoader().getResourceAsStream(ZOOKEEPER_CONFIG_NAME);
|
||||
if (inputStream == null) {
|
||||
LOG.error("fail to open ZooKeeper config file " + ZOOKEEPER_CONFIG_NAME);
|
||||
return;
|
||||
}
|
||||
|
||||
Properties properties = new Properties();
|
||||
try {
|
||||
properties.load(inputStream);
|
||||
} catch (IOException e) {
|
||||
LOG.error("fail to read properties from " + ZOOKEEPER_CONFIG_NAME);
|
||||
return;
|
||||
}
|
||||
|
||||
String clientPort = null;
|
||||
List<String> servers = new ArrayList<String>();
|
||||
|
||||
// The clientPort option may come after the server.X hosts, so we need to
|
||||
// grab everything and then create the final host:port comma separated list.
|
||||
for (Entry<Object,Object> property : properties.entrySet()) {
|
||||
String key = property.getKey().toString().trim();
|
||||
String value = property.getValue().toString().trim();
|
||||
if (key.equals("clientPort")) {
|
||||
clientPort = value;
|
||||
}
|
||||
else if (key.startsWith("server.")) {
|
||||
String host = value.substring(0, value.indexOf(':'));
|
||||
servers.add(host);
|
||||
}
|
||||
}
|
||||
|
||||
if (clientPort == null) {
|
||||
LOG.error("no clientPort found in " + ZOOKEEPER_CONFIG_NAME);
|
||||
return;
|
||||
}
|
||||
|
||||
// If no server.X lines exist, then we're using a single instance ZooKeeper
|
||||
// on the master node.
|
||||
if (servers.isEmpty()) {
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
String masterAddress = conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS);
|
||||
String masterHost = "localhost";
|
||||
if (!masterAddress.equals("local")) {
|
||||
masterHost = masterAddress.substring(0, masterAddress.indexOf(':'));
|
||||
}
|
||||
servers.add(masterHost);
|
||||
}
|
||||
|
||||
StringBuilder hostPortBuilder = new StringBuilder();
|
||||
for (int i = 0; i < servers.size(); ++i) {
|
||||
String host = servers.get(i);
|
||||
if (i > 0) {
|
||||
hostPortBuilder.append(',');
|
||||
}
|
||||
hostPortBuilder.append(host);
|
||||
hostPortBuilder.append(':');
|
||||
hostPortBuilder.append(clientPort);
|
||||
}
|
||||
|
||||
quorumServers = hostPortBuilder.toString();
|
||||
LOG.info("Quorum servers: " + quorumServers);
|
||||
}
|
||||
|
||||
/** @return true if currently connected to ZooKeeper, false otherwise. */
|
||||
public boolean isConnected() {
|
||||
return zooKeeper.getState() == States.CONNECTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read location of server storing root region.
|
||||
* @return HServerAddress pointing to server serving root region or null if
|
||||
* there was a problem reading the ZNode.
|
||||
*/
|
||||
public HServerAddress readRootRegionLocation() {
|
||||
byte[] data;
|
||||
try {
|
||||
data = zooKeeper.getData(rootRegionZNode, false, null);
|
||||
} catch (InterruptedException e) {
|
||||
return null;
|
||||
} catch (KeeperException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String addressString = Bytes.toString(data);
|
||||
LOG.debug("Read ZNode " + rootRegionZNode + " got " + addressString);
|
||||
HServerAddress address = new HServerAddress(addressString);
|
||||
return address;
|
||||
}
|
||||
|
||||
private boolean ensureParentZNodeExists() {
|
||||
try {
|
||||
zooKeeper.create(parentZNode, new byte[0],
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
LOG.debug("Created ZNode " + parentZNode);
|
||||
return true;
|
||||
} catch (KeeperException.NodeExistsException e) {
|
||||
return true; // ok, move on.
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to create " + parentZNode + ": " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to create " + parentZNode + ": " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete ZNode containing root region location.
|
||||
* @return true if operation succeeded, false otherwise.
|
||||
*/
|
||||
public boolean deleteRootRegionLocation() {
|
||||
if (!ensureParentZNodeExists()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
zooKeeper.delete(rootRegionZNode, -1);
|
||||
LOG.debug("Deleted ZNode " + rootRegionZNode);
|
||||
return true;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
return true; // ok, move on.
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to delete " + rootRegionZNode + ": " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to delete " + rootRegionZNode + ": " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean createRootRegionLocation(String address) {
|
||||
byte[] data = Bytes.toBytes(address);
|
||||
try {
|
||||
zooKeeper.create(rootRegionZNode, data, Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
LOG.debug("Created ZNode " + rootRegionZNode + " with data " + address);
|
||||
return true;
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to create root region in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to create root region in ZooKeeper: " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean updateRootRegionLocation(String address) {
|
||||
byte[] data = Bytes.toBytes(address);
|
||||
try {
|
||||
zooKeeper.setData(rootRegionZNode, data, -1);
|
||||
LOG.debug("SetData of ZNode " + rootRegionZNode + " with " + address);
|
||||
return true;
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to set root region location in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to set root region location in ZooKeeper: " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write root region location to ZooKeeper. If address is null, delete ZNode.
|
||||
* containing root region location.
|
||||
* @param address HServerAddress to write to ZK.
|
||||
* @return true if operation succeeded, false otherwise.
|
||||
*/
|
||||
public boolean writeRootRegionLocation(HServerAddress address) {
|
||||
if (address == null) {
|
||||
return deleteRootRegionLocation();
|
||||
}
|
||||
|
||||
if (!ensureParentZNodeExists()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String addressString = address.toString();
|
||||
|
||||
if (checkExistenceOf(rootRegionZNode)) {
|
||||
return updateRootRegionLocation(addressString);
|
||||
}
|
||||
|
||||
return createRootRegionLocation(addressString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we're out of safe mode. Being out of safe mode is signified by an
|
||||
* ephemeral ZNode existing in ZooKeeper.
|
||||
* @return true if we're out of safe mode, false otherwise.
|
||||
*/
|
||||
public boolean checkOutOfSafeMode() {
|
||||
if (!ensureParentZNodeExists()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return checkExistenceOf(outOfSafeModeZNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create ephemeral ZNode signifying that we're out of safe mode.
|
||||
* @return true if ephemeral ZNode created successfully, false otherwise.
|
||||
*/
|
||||
public boolean writeOutOfSafeMode() {
|
||||
if (!ensureParentZNodeExists()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
zooKeeper.create(outOfSafeModeZNode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.EPHEMERAL);
|
||||
LOG.debug("Wrote out of safe mode");
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to create out of safe mode in ZooKeeper: " + e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to create out of safe mode in ZooKeeper: " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean checkExistenceOf(String path) {
|
||||
Stat stat = null;
|
||||
try {
|
||||
stat = zooKeeper.exists(path, false);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("checking existence of " + path, e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("checking existence of " + path, e);
|
||||
}
|
||||
|
||||
return stat != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close this ZooKeeper session.
|
||||
*/
|
||||
public void close() {
|
||||
try {
|
||||
zooKeeper.close();
|
||||
LOG.debug("Closed connection with ZooKeeper");
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to close connection with ZooKeeper");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -68,9 +68,16 @@ operation, continue reading.
|
|||
</p>
|
||||
|
||||
<h2><a name="distributed">Distributed Operation</a></h2>
|
||||
<p>Distributed mode requires an instance of the Hadoop Distributed File System (DFS).
|
||||
<p>Distributed mode requires an instance of the Hadoop Distributed File System (DFS) and a ZooKeeper cluster.
|
||||
See the Hadoop <a href="http://lucene.apache.org/hadoop/api/overview-summary.html#overview_description">
|
||||
requirements and instructions</a> for how to set up a DFS.</p>
|
||||
requirements and instructions</a> for how to set up a DFS.
|
||||
See the ZooKeeeper <a href="http://hadoop.apache.org/zookeeper/docs/current/zookeeperStarted.html">Getting Started Guide</a>
|
||||
for information about the ZooKeeper distributed coordination service.
|
||||
If you do not configure a ZooKeeper cluster, HBase will manage a single instance
|
||||
ZooKeeper service for you running on the master node.
|
||||
This is intended for development and local testing only.
|
||||
It SHOULD NOT be used in a fully-distributed production operation.
|
||||
</p>
|
||||
|
||||
<h3><a name="pseudo-distrib">Pseudo-Distributed Operation</a></h3>
|
||||
<p>A pseudo-distributed operation is simply a distributed operation run on a single host.
|
||||
|
@ -130,6 +137,14 @@ operation requires that you also modify <code>${HBASE_HOME}/conf/regionservers</
|
|||
<code>regionserver</code> lists all the hosts running HRegionServers, one host per line (This file
|
||||
in HBase is like the hadoop slaves file at <code>${HADOOP_HOME}/conf/slaves</code>).
|
||||
</p>
|
||||
<p>
|
||||
Furthermore, you should configure a distributed ZooKeeper cluster.
|
||||
The ZooKeeper configuration file is stored at <code>${HBASE_HOME}/conf/zoo.cfg</code>.
|
||||
See the ZooKeeper <a href="http://hadoop.apache.org/zookeeper/docs/current/zookeeperStarted.html"> Getting Started Guide</a> for information about the format and options of that file.
|
||||
Specifically, look at the <a href="http://hadoop.apache.org/zookeeper/docs/current/zookeeperStarted.html#sc_RunningReplicatedZooKeeper">Running Replicated ZooKeeper</a> section.
|
||||
In <code>${HBASE_HOME}/conf/hbase-env.sh</code>, set <code>HBASE_MANAGES_ZK=false</code> to tell HBase not to manage its own single instance ZooKeeper service.
|
||||
</p>
|
||||
|
||||
<p>Of note, if you have made <i>HDFS client configuration</i> on your hadoop cluster, hbase will not
|
||||
see this configuration unless you do one of the following:
|
||||
<ul>
|
||||
|
|
|
@ -44,3 +44,4 @@ log4j.appender.console.layout.ConversionPattern=%d %-5p [%t] %C{2}(%L): %m%n
|
|||
|
||||
log4j.logger.org.apache.hadoop=WARN
|
||||
log4j.logger.org.apache.hadoop.hbase=DEBUG
|
||||
log4j.logger.org.apache.zookeeper=WARN
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -39,8 +41,11 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
private static final Log LOG = LogFactory.getLog(HBaseClusterTestCase.class);
|
||||
protected MiniHBaseCluster cluster;
|
||||
protected MiniDFSCluster dfsCluster;
|
||||
protected MiniZooKeeperCluster zooKeeperCluster;
|
||||
protected int regionServers;
|
||||
protected int numZooKeeperPeers;
|
||||
protected boolean startDfs;
|
||||
private boolean openMetaTable = true;
|
||||
|
||||
/** default constructor */
|
||||
public HBaseClusterTestCase() {
|
||||
|
@ -57,8 +62,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
this(regionServers, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a MiniHBaseCluster with regionServers region servers in-process to
|
||||
/** in-process to
|
||||
* start with. Optionally, startDfs indicates if a MiniDFSCluster should be
|
||||
* started. If startDfs is false, the assumption is that an external DFS is
|
||||
* configured in hbase-site.xml and is already started, or you have started a
|
||||
|
@ -71,8 +75,13 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
super();
|
||||
this.startDfs = startDfs;
|
||||
this.regionServers = regionServers;
|
||||
this.numZooKeeperPeers = 1;
|
||||
}
|
||||
|
||||
|
||||
protected void setOpenMetaTable(boolean val) {
|
||||
openMetaTable = val;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run after dfs is ready but before hbase cluster is started up.
|
||||
*/
|
||||
|
@ -84,10 +93,20 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
* Actually start the MiniHBase instance.
|
||||
*/
|
||||
protected void hBaseClusterSetup() throws Exception {
|
||||
File testDir = new File(getUnitTestdir(getName()).toString());
|
||||
|
||||
// Note that this is done before we create the MiniHBaseCluster because we
|
||||
// need to edit the config to add the ZooKeeper servers.
|
||||
this.zooKeeperCluster = new MiniZooKeeperCluster(conf);
|
||||
this.zooKeeperCluster.startup(numZooKeeperPeers, testDir);
|
||||
|
||||
// start the mini cluster
|
||||
this.cluster = new MiniHBaseCluster(conf, regionServers);
|
||||
// opening the META table ensures that cluster is running
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
if (openMetaTable) {
|
||||
// opening the META table ensures that cluster is running
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -120,10 +139,10 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
|
||||
// run the pre-cluster setup
|
||||
preHBaseClusterSetup();
|
||||
|
||||
|
||||
// start the instance
|
||||
hBaseClusterSetup();
|
||||
|
||||
|
||||
// run post-cluster setup
|
||||
postHBaseClusterSetup();
|
||||
} catch (Exception e) {
|
||||
|
@ -131,6 +150,9 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
if (zooKeeperCluster != null) {
|
||||
zooKeeperCluster.shutdown();
|
||||
}
|
||||
if (dfsCluster != null) {
|
||||
shutdownDfs(dfsCluster);
|
||||
}
|
||||
|
@ -140,6 +162,10 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (!openMetaTable) {
|
||||
// open the META table now to ensure cluster is running before shutdown.
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
}
|
||||
super.tearDown();
|
||||
try {
|
||||
HConnectionManager.deleteConnectionInfo(conf, true);
|
||||
|
@ -149,6 +175,11 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
} catch (Exception e) {
|
||||
LOG.warn("Closing mini dfs", e);
|
||||
}
|
||||
try {
|
||||
this.zooKeeperCluster.shutdown();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Shutting down ZooKeeper cluster", e);
|
||||
}
|
||||
}
|
||||
if (startDfs) {
|
||||
shutdownDfs(dfsCluster);
|
||||
|
|
|
@ -0,0 +1,312 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.server.NIOServerCnxn;
|
||||
import org.apache.zookeeper.server.ServerStats;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
||||
import org.apache.zookeeper.server.quorum.QuorumStats;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
||||
|
||||
/**
|
||||
* TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
|
||||
* of redoing it, we should contribute updates to their code which let us more
|
||||
* easily access testing helper objects.
|
||||
*/
|
||||
public class MiniZooKeeperCluster {
|
||||
private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
|
||||
|
||||
// TODO: make this more configurable?
|
||||
private static final int CLIENT_PORT_START = 21810; // use non-standard port
|
||||
private static final int LEADER_PORT_START = 31810; // use non-standard port
|
||||
private static final int TICK_TIME = 2000;
|
||||
private static final int INIT_LIMIT = 3;
|
||||
private static final int SYNC_LIMIT = 3;
|
||||
private static final int CONNECTION_TIMEOUT = 30000;
|
||||
|
||||
private HBaseConfiguration conf;
|
||||
|
||||
private boolean started;
|
||||
private int numPeers;
|
||||
private File baseDir;
|
||||
|
||||
// for distributed mode.
|
||||
private QuorumPeer[] quorumPeers;
|
||||
// for standalone mode.
|
||||
private NIOServerCnxn.Factory standaloneServerFactory;
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public MiniZooKeeperCluster(HBaseConfiguration conf) throws IOException {
|
||||
this.conf = conf;
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
// / XXX: From o.a.zk.t.ClientBase
|
||||
private static void setupTestEnv() {
|
||||
// during the tests we run with 100K prealloc in the logs.
|
||||
// on windows systems prealloc of 64M was seen to take ~15seconds
|
||||
// resulting in test failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numPeers
|
||||
* @param baseDir
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void startup(int numPeers, File baseDir) throws IOException,
|
||||
InterruptedException {
|
||||
setupTestEnv();
|
||||
|
||||
shutdown();
|
||||
|
||||
if (numPeers < 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.numPeers = numPeers;
|
||||
this.baseDir = baseDir.getAbsoluteFile();
|
||||
if (isDistributed()) {
|
||||
startupDistributed();
|
||||
} else {
|
||||
startupStandalone();
|
||||
}
|
||||
|
||||
started = true;
|
||||
}
|
||||
|
||||
private void startupStandalone() throws IOException, InterruptedException {
|
||||
ServerStats.registerAsConcrete();
|
||||
|
||||
File dir = new File(baseDir, "zookeeper-standalone");
|
||||
recreateDir(dir);
|
||||
|
||||
ZooKeeperServer server = new ZooKeeperServer(dir, dir, TICK_TIME);
|
||||
standaloneServerFactory = new NIOServerCnxn.Factory(CLIENT_PORT_START);
|
||||
standaloneServerFactory.startup(server);
|
||||
|
||||
ZooKeeperWrapper.setQuorumServers("localhost:" + CLIENT_PORT_START);
|
||||
|
||||
if (!waitForServerUp(CLIENT_PORT_START, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.QuorumTest.startServers
|
||||
private void startupDistributed() throws IOException {
|
||||
QuorumStats.registerAsConcrete();
|
||||
|
||||
// Create map of peers
|
||||
HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
|
||||
for (int id = 1; id <= numPeers; ++id) {
|
||||
int port = LEADER_PORT_START + id;
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", port);
|
||||
QuorumServer server = new QuorumServer(id, addr);
|
||||
peers.put(Long.valueOf(id), server);
|
||||
}
|
||||
|
||||
StringBuffer serversBuffer = new StringBuffer();
|
||||
|
||||
// Initialize each quorum peer.
|
||||
quorumPeers = new QuorumPeer[numPeers];
|
||||
for (int id = 1; id <= numPeers; ++id) {
|
||||
File dir = new File(baseDir, "zookeeper-peer-" + id);
|
||||
recreateDir(dir);
|
||||
|
||||
int port = CLIENT_PORT_START + id;
|
||||
quorumPeers[id - 1] = new QuorumPeer(peers, dir, dir, port, 0, id,
|
||||
TICK_TIME, INIT_LIMIT, SYNC_LIMIT);
|
||||
|
||||
if (id > 1) {
|
||||
serversBuffer.append(",");
|
||||
}
|
||||
serversBuffer.append("localhost:" + port);
|
||||
}
|
||||
|
||||
String servers = serversBuffer.toString();
|
||||
ZooKeeperWrapper.setQuorumServers(servers);
|
||||
|
||||
// Start quorum peer threads.
|
||||
for (QuorumPeer qp : quorumPeers) {
|
||||
qp.start();
|
||||
}
|
||||
|
||||
// Wait for quorum peers to be up before going on.
|
||||
for (int id = 1; id <= numPeers; ++id) {
|
||||
int port = CLIENT_PORT_START + id;
|
||||
if (!waitForServerUp(port, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of peer " + id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void recreateDir(File dir) throws IOException {
|
||||
if (dir.exists()) {
|
||||
FileUtil.fullyDelete(dir);
|
||||
}
|
||||
try {
|
||||
dir.mkdirs();
|
||||
} catch (SecurityException e) {
|
||||
throw new IOException("creating dir: " + dir, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void shutdown() throws IOException, InterruptedException {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isDistributed()) {
|
||||
shutdownDistributed();
|
||||
} else {
|
||||
shutdownStandalone();
|
||||
}
|
||||
|
||||
started = false;
|
||||
}
|
||||
|
||||
private boolean isDistributed() {
|
||||
return numPeers > 1;
|
||||
}
|
||||
|
||||
private void shutdownDistributed() throws IOException, InterruptedException {
|
||||
for (QuorumPeer qp : quorumPeers) {
|
||||
qp.shutdown();
|
||||
qp.join(CONNECTION_TIMEOUT);
|
||||
if (qp.isAlive()) {
|
||||
throw new IOException("QuorumPeer " + qp.getId()
|
||||
+ " failed to shutdown");
|
||||
}
|
||||
}
|
||||
|
||||
for (int id = 1; id <= quorumPeers.length; ++id) {
|
||||
int port = CLIENT_PORT_START + id;
|
||||
if (!waitForServerDown(port, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of peer " + id);
|
||||
}
|
||||
}
|
||||
|
||||
ServerStats.unregister();
|
||||
}
|
||||
|
||||
private void shutdownStandalone() throws IOException {
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(CLIENT_PORT_START, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
ServerStats.unregister();
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerDown(int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
} finally {
|
||||
sock.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerUp(int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
|
||||
Reader isr = new InputStreamReader(sock.getInputStream());
|
||||
reader = new BufferedReader(isr);
|
||||
String line = reader.readLine();
|
||||
if (line != null && line.startsWith("Zookeeper version:")) {
|
||||
return true;
|
||||
}
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server localhost:" + port + " not up " + e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Scanner;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TestZooKeeper extends HBaseClusterTestCase {
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
setOpenMetaTable(false);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testWritesRootRegionLocation() throws IOException {
|
||||
ZooKeeperWrapper zooKeeper = new ZooKeeperWrapper(conf);
|
||||
|
||||
boolean outOfSafeMode = zooKeeper.checkOutOfSafeMode();
|
||||
assertFalse(outOfSafeMode);
|
||||
|
||||
HServerAddress zooKeeperRootAddress = zooKeeper.readRootRegionLocation();
|
||||
assertNull(zooKeeperRootAddress);
|
||||
|
||||
HMaster master = cluster.getMaster();
|
||||
HServerAddress masterRootAddress = master.getRootRegionLocation();
|
||||
assertNull(masterRootAddress);
|
||||
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
outOfSafeMode = zooKeeper.checkOutOfSafeMode();
|
||||
assertTrue(outOfSafeMode);
|
||||
|
||||
zooKeeperRootAddress = zooKeeper.readRootRegionLocation();
|
||||
assertNotNull(zooKeeperRootAddress);
|
||||
|
||||
masterRootAddress = master.getRootRegionLocation();
|
||||
assertEquals(masterRootAddress, zooKeeperRootAddress);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue