HBASE-1345 Remove distributed mode from MiniZooKeeper
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@773757 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0d8fdd9378
commit
58e206e9c8
|
@ -226,6 +226,7 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1405 Threads.shutdown has unnecessary branch
|
||||
HBASE-1407 Changing internal structure of ImmutableBytesWritable
|
||||
contructor (Erik Holstad via Stack)
|
||||
HBASE-1345 Remove distributed mode from MiniZooKeeper (Nitay Joffe via Stack)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -139,6 +139,11 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
quorumServers = servers;
|
||||
}
|
||||
|
||||
/** @return comma separated host:port list of ZooKeeper quorum servers. */
|
||||
public static String getQuorumServers() {
|
||||
return quorumServers;
|
||||
}
|
||||
|
||||
private static void loadZooKeeperConfig() {
|
||||
Properties properties = null;
|
||||
try {
|
||||
|
|
|
@ -43,7 +43,6 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
protected MiniDFSCluster dfsCluster;
|
||||
protected MiniZooKeeperCluster zooKeeperCluster;
|
||||
protected int regionServers;
|
||||
protected int numZooKeeperPeers;
|
||||
protected boolean startDfs;
|
||||
private boolean openMetaTable = true;
|
||||
|
||||
|
@ -75,7 +74,6 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
super();
|
||||
this.startDfs = startDfs;
|
||||
this.regionServers = regionServers;
|
||||
this.numZooKeeperPeers = 1;
|
||||
}
|
||||
|
||||
protected void setOpenMetaTable(boolean val) {
|
||||
|
@ -98,7 +96,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
// Note that this is done before we create the MiniHBaseCluster because we
|
||||
// need to edit the config to add the ZooKeeper servers.
|
||||
this.zooKeeperCluster = new MiniZooKeeperCluster();
|
||||
this.zooKeeperCluster.startup(numZooKeeperPeers, testDir);
|
||||
this.zooKeeperCluster.startup(testDir);
|
||||
|
||||
// start the mini cluster
|
||||
this.cluster = new MiniHBaseCluster(conf, regionServers);
|
||||
|
|
|
@ -26,9 +26,7 @@ import java.io.InputStreamReader;
|
|||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.net.BindException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -37,8 +35,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
|||
import org.apache.zookeeper.server.NIOServerCnxn;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
||||
|
||||
/**
|
||||
* TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
|
||||
|
@ -49,21 +45,12 @@ public class MiniZooKeeperCluster {
|
|||
private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
|
||||
|
||||
// TODO: make this more configurable?
|
||||
private static final int LEADER_PORT_START = 31810; // use non-standard port
|
||||
private static final int TICK_TIME = 2000;
|
||||
private static final int INIT_LIMIT = 3;
|
||||
private static final int SYNC_LIMIT = 3;
|
||||
private static final int CONNECTION_TIMEOUT = 30000;
|
||||
|
||||
private boolean started;
|
||||
private int clientPortStart = 21810; // use non-standard port
|
||||
private int numPeers;
|
||||
private File baseDir;
|
||||
private String quorumServers;
|
||||
private int clientPort = 21810; // use non-standard port
|
||||
|
||||
// for distributed mode.
|
||||
private QuorumPeer[] quorumPeers;
|
||||
// for standalone mode.
|
||||
private NIOServerCnxn.Factory standaloneServerFactory;
|
||||
|
||||
/** Create mini ZooKeeper cluster. */
|
||||
|
@ -82,109 +69,41 @@ public class MiniZooKeeperCluster {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return String ZooKeeper quorum servers.
|
||||
*/
|
||||
public String getQuorumServers() {
|
||||
return quorumServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numPeers
|
||||
* @param baseDir
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void startup(int numPeers, File baseDir) throws IOException,
|
||||
public void startup(File baseDir) throws IOException,
|
||||
InterruptedException {
|
||||
setupTestEnv();
|
||||
|
||||
shutdown();
|
||||
|
||||
if (numPeers < 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.numPeers = numPeers;
|
||||
this.baseDir = baseDir.getAbsoluteFile();
|
||||
if (isDistributed()) {
|
||||
startupDistributed();
|
||||
} else {
|
||||
startupStandalone();
|
||||
}
|
||||
|
||||
started = true;
|
||||
}
|
||||
|
||||
private void startupStandalone() throws IOException, InterruptedException {
|
||||
File dir = new File(baseDir, "zookeeper-standalone");
|
||||
File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
|
||||
recreateDir(dir);
|
||||
|
||||
ZooKeeperServer server = new ZooKeeperServer(dir, dir, TICK_TIME);
|
||||
while (true) {
|
||||
try {
|
||||
standaloneServerFactory = new NIOServerCnxn.Factory(clientPortStart);
|
||||
standaloneServerFactory = new NIOServerCnxn.Factory(clientPort);
|
||||
} catch (BindException e) {
|
||||
LOG.info("Faild binding ZK Server to client port: " + clientPortStart);
|
||||
LOG.info("Faild binding ZK Server to client port: " + clientPort);
|
||||
//this port is already in use. try to use another
|
||||
clientPortStart++;
|
||||
clientPort++;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
standaloneServerFactory.startup(server);
|
||||
|
||||
quorumServers = "localhost:" + clientPortStart;
|
||||
String quorumServers = "localhost:" + clientPort;
|
||||
ZooKeeperWrapper.setQuorumServers(quorumServers);
|
||||
|
||||
if (!waitForServerUp(clientPortStart, CONNECTION_TIMEOUT)) {
|
||||
if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.QuorumTest.startServers
|
||||
private void startupDistributed() throws IOException {
|
||||
// Create map of peers
|
||||
HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
|
||||
for (int id = 1; id <= numPeers; ++id) {
|
||||
int port = LEADER_PORT_START + id;
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", port);
|
||||
QuorumServer server = new QuorumServer(id, addr);
|
||||
peers.put(Long.valueOf(id), server);
|
||||
}
|
||||
|
||||
StringBuffer serversBuffer = new StringBuffer();
|
||||
|
||||
// Initialize each quorum peer.
|
||||
quorumPeers = new QuorumPeer[numPeers];
|
||||
for (int id = 1; id <= numPeers; ++id) {
|
||||
File dir = new File(baseDir, "zookeeper-peer-" + id);
|
||||
recreateDir(dir);
|
||||
|
||||
int port = clientPortStart + id;
|
||||
quorumPeers[id - 1] = new QuorumPeer(peers, dir, dir, port, 0, id,
|
||||
TICK_TIME, INIT_LIMIT, SYNC_LIMIT);
|
||||
|
||||
if (id > 1) {
|
||||
serversBuffer.append(",");
|
||||
}
|
||||
serversBuffer.append("localhost:" + port);
|
||||
}
|
||||
|
||||
quorumServers = serversBuffer.toString();
|
||||
ZooKeeperWrapper.setQuorumServers(quorumServers);
|
||||
|
||||
// Start quorum peer threads.
|
||||
for (QuorumPeer qp : quorumPeers) {
|
||||
qp.start();
|
||||
}
|
||||
|
||||
// Wait for quorum peers to be up before going on.
|
||||
for (int id = 1; id <= numPeers; ++id) {
|
||||
int port = clientPortStart + id;
|
||||
if (!waitForServerUp(port, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of peer " + id);
|
||||
}
|
||||
}
|
||||
started = true;
|
||||
}
|
||||
|
||||
private void recreateDir(File dir) throws IOException {
|
||||
|
@ -200,51 +119,20 @@ public class MiniZooKeeperCluster {
|
|||
|
||||
/**
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void shutdown() throws IOException, InterruptedException {
|
||||
public void shutdown() throws IOException {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isDistributed()) {
|
||||
shutdownDistributed();
|
||||
} else {
|
||||
shutdownStandalone();
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
|
||||
started = false;
|
||||
}
|
||||
|
||||
private boolean isDistributed() {
|
||||
return numPeers > 1;
|
||||
}
|
||||
|
||||
private void shutdownDistributed() throws IOException, InterruptedException {
|
||||
for (QuorumPeer qp : quorumPeers) {
|
||||
qp.shutdown();
|
||||
qp.join(CONNECTION_TIMEOUT);
|
||||
if (qp.isAlive()) {
|
||||
throw new IOException("QuorumPeer " + qp.getId()
|
||||
+ " failed to shutdown");
|
||||
}
|
||||
}
|
||||
|
||||
for (int id = 1; id <= quorumPeers.length; ++id) {
|
||||
int port = clientPortStart + id;
|
||||
if (!waitForServerDown(port, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of peer " + id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdownStandalone() throws IOException {
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(clientPortStart, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerDown(int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
|
|
|
@ -94,7 +94,7 @@ public class TestZooKeeper extends HBaseClusterTestCase {
|
|||
public void testClientSessionExpired() throws IOException, InterruptedException {
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
String quorumServers = zooKeeperCluster.getQuorumServers();
|
||||
String quorumServers = ZooKeeperWrapper.getQuorumServers();
|
||||
int sessionTimeout = conf.getInt("zookeeper.session.timeout", 2 * 1000);
|
||||
Watcher watcher = new EmptyWatcher();
|
||||
HConnection connection = HConnectionManager.getConnection(conf);
|
||||
|
@ -118,7 +118,7 @@ public class TestZooKeeper extends HBaseClusterTestCase {
|
|||
try {
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
String quorumServers = zooKeeperCluster.getQuorumServers();
|
||||
String quorumServers = ZooKeeperWrapper.getQuorumServers();
|
||||
int sessionTimeout = conf.getInt("zookeeper.session.timeout", 2 * 1000);
|
||||
|
||||
Watcher watcher = new EmptyWatcher();
|
||||
|
|
Loading…
Reference in New Issue