HBASE-3052 Add ability to have multiple ZK servers in a quorum in MiniZooKeeperCluster for test writing

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1085190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-03-24 23:20:24 +00:00
parent 91404d9c5d
commit cad0a1bb9c
5 changed files with 228 additions and 41 deletions

View File

@ -100,6 +100,8 @@ Release 0.91.0 - Unreleased
(Karthick Sankarachary via Stack) (Karthick Sankarachary via Stack)
HBASE-3474 HFileOutputFormat to use column family's compression algorithm HBASE-3474 HFileOutputFormat to use column family's compression algorithm
HBASE-3541 REST Multi Gets (Elliott Clark via Stack) HBASE-3541 REST Multi Gets (Elliott Clark via Stack)
HBASE-3052 Add ability to have multiple ZK servers in a quorum in
MiniZooKeeperCluster for test writing (Liyin Tang via Stack)
TASK TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -121,7 +121,7 @@ public class HMasterCommandLine extends ServerCommandLine {
if (zkClientPort == 0) { if (zkClientPort == 0) {
throw new IOException("No config value for hbase.zookeeper.property.clientPort"); throw new IOException("No config value for hbase.zookeeper.property.clientPort");
} }
zooKeeperCluster.setClientPort(zkClientPort); zooKeeperCluster.setDefaultClientPort(zkClientPort);
int clientPort = zooKeeperCluster.startup(zkDataPath); int clientPort = zooKeeperCluster.startup(zkDataPath);
if (clientPort != zkClientPort) { if (clientPort != zkClientPort) {
String errorMsg = "Couldnt start ZK at requested address of " + String errorMsg = "Couldnt start ZK at requested address of " +

View File

@ -28,6 +28,8 @@ import java.io.Reader;
import java.net.BindException; import java.net.BindException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -48,28 +50,46 @@ public class MiniZooKeeperCluster {
private static final int CONNECTION_TIMEOUT = 30000; private static final int CONNECTION_TIMEOUT = 30000;
private boolean started; private boolean started;
private int clientPort = 21818; // use non-standard port
private NIOServerCnxn.Factory standaloneServerFactory; private int defaultClientPort = 21818; // use non-standard port
private int clientPort = defaultClientPort;
private List<NIOServerCnxn.Factory> standaloneServerFactoryList;
private List<ZooKeeperServer> zooKeeperServers;
private List<Integer> clientPortList;
private int activeZKServerIndex;
private int tickTime = 0; private int tickTime = 0;
/** Create mini ZooKeeper cluster. */ /** Create mini ZooKeeper cluster. */
public MiniZooKeeperCluster() { public MiniZooKeeperCluster() {
this.started = false; this.started = false;
activeZKServerIndex = -1;
zooKeeperServers = new ArrayList<ZooKeeperServer>();
clientPortList = new ArrayList<Integer>();
standaloneServerFactoryList = new ArrayList<NIOServerCnxn.Factory>();
} }
public void setClientPort(int clientPort) { public void setDefaultClientPort(int clientPort) {
this.clientPort = clientPort; this.defaultClientPort = clientPort;
} }
public int getClientPort() { public int getDefaultClientPort() {
return clientPort; return defaultClientPort;
} }
public void setTickTime(int tickTime) { public void setTickTime(int tickTime) {
this.tickTime = tickTime; this.tickTime = tickTime;
} }
public int getBackupZooKeeperServerNum() {
return zooKeeperServers.size()-1;
}
public int getZooKeeperServerNum() {
return zooKeeperServers.size();
}
// / XXX: From o.a.zk.t.ClientBase // / XXX: From o.a.zk.t.ClientBase
private static void setupTestEnv() { private static void setupTestEnv() {
// during the tests we run with 100K prealloc in the logs. // during the tests we run with 100K prealloc in the logs.
@ -80,22 +100,31 @@ public class MiniZooKeeperCluster {
FileTxnLog.setPreallocSize(100); FileTxnLog.setPreallocSize(100);
} }
public int startup(File baseDir) throws IOException,
InterruptedException {
return startup(baseDir,1);
}
/** /**
* @param baseDir * @param baseDir
* @param numZooKeeperServers
* @return ClientPort server bound to. * @return ClientPort server bound to.
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
public int startup(File baseDir) throws IOException, public int startup(File baseDir, int numZooKeeperServers) throws IOException,
InterruptedException { InterruptedException {
if (numZooKeeperServers <= 0)
return -1;
setupTestEnv(); setupTestEnv();
shutdown(); shutdown();
File dir = new File(baseDir, "zookeeper").getAbsoluteFile(); // running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) {
File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
recreateDir(dir); recreateDir(dir);
clientPort = defaultClientPort;
int tickTimeToUse; int tickTimeToUse;
if (this.tickTime > 0) { if (this.tickTime > 0) {
tickTimeToUse = this.tickTime; tickTimeToUse = this.tickTime;
@ -103,6 +132,7 @@ public class MiniZooKeeperCluster {
tickTimeToUse = TICK_TIME; tickTimeToUse = TICK_TIME;
} }
ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
NIOServerCnxn.Factory standaloneServerFactory;
while (true) { while (true) {
try { try {
standaloneServerFactory = standaloneServerFactory =
@ -115,14 +145,24 @@ public class MiniZooKeeperCluster {
} }
break; break;
} }
standaloneServerFactory.startup(server);
// Start up this ZK server
standaloneServerFactory.startup(server);
if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) { if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server"); throw new IOException("Waiting for startup of standalone server");
} }
clientPortList.add(clientPort);
standaloneServerFactoryList.add(standaloneServerFactory);
zooKeeperServers.add(server);
}
// set the first one to be active ZK; Others are backups
activeZKServerIndex = 0;
started = true; started = true;
LOG.info("Started MiniZK Server on client port: " + clientPort); clientPort = clientPortList.get(activeZKServerIndex);
LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
"on client port: " + clientPort);
return clientPort; return clientPort;
} }
@ -144,13 +184,96 @@ public class MiniZooKeeperCluster {
if (!started) { if (!started) {
return; return;
} }
// shut down all the zk servers
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
NIOServerCnxn.Factory standaloneServerFactory =
standaloneServerFactoryList.get(i);
int clientPort = clientPortList.get(i);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
}
// clear everything
started = false;
activeZKServerIndex = 0;
standaloneServerFactoryList.clear();
clientPortList.clear();
zooKeeperServers.clear();
LOG.info("Shutdown MiniZK cluster with all ZK servers");
}
/**@return clientPort return clientPort if there is another ZK backup can run
* when killing the current active; return -1, if there is no backups.
* @throws IOException
* @throws InterruptedException
*/
public int killCurrentActiveZooKeeperServer() throws IOException,
InterruptedException {
if (!started || activeZKServerIndex < 0 ) {
return -1;
}
// Shutdown the current active one
NIOServerCnxn.Factory standaloneServerFactory =
standaloneServerFactoryList.get(activeZKServerIndex);
int clientPort = clientPortList.get(activeZKServerIndex);
standaloneServerFactory.shutdown(); standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server"); throw new IOException("Waiting for shutdown of standalone server");
} }
started = false; // remove the current active zk server
standaloneServerFactoryList.remove(activeZKServerIndex);
clientPortList.remove(activeZKServerIndex);
zooKeeperServers.remove(activeZKServerIndex);
LOG.info("Kill the current active ZK servers in the cluster " +
"on client port: " + clientPort);
if (standaloneServerFactoryList.size() == 0) {
// there is no backup servers;
return -1;
}
clientPort = clientPortList.get(activeZKServerIndex);
LOG.info("Activate a backup zk server in the cluster " +
"on client port: " + clientPort);
// return the next back zk server's port
return clientPort;
}
/**
* Kill one back up ZK servers
* @throws IOException
* @throws InterruptedException
*/
public void killOneBackupZooKeeperServer() throws IOException,
InterruptedException {
if (!started || activeZKServerIndex < 0 ||
standaloneServerFactoryList.size() <= 1) {
return ;
}
int backupZKServerIndex = activeZKServerIndex+1;
// Shutdown the current active one
NIOServerCnxn.Factory standaloneServerFactory =
standaloneServerFactoryList.get(backupZKServerIndex);
int clientPort = clientPortList.get(backupZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
// remove this backup zk server
standaloneServerFactoryList.remove(backupZKServerIndex);
clientPortList.remove(backupZKServerIndex);
zooKeeperServers.remove(backupZKServerIndex);
LOG.info("Kill one backup ZK servers in the cluster " +
"on client port: " + clientPort);
} }
// XXX: From o.a.zk.t.ClientBase // XXX: From o.a.zk.t.ClientBase

View File

@ -245,18 +245,38 @@ public class HBaseTestingUtility {
* @return zk cluster started. * @return zk cluster started.
*/ */
public MiniZooKeeperCluster startMiniZKCluster() throws Exception { public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
return startMiniZKCluster(setupClusterTestBuildDir()); return startMiniZKCluster(setupClusterTestBuildDir(),1);
}
/**
* Call this if you only want a zk cluster.
* @param zooKeeperServerNum
* @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
* @throws Exception
* @see #shutdownMiniZKCluster()
* @return zk cluster started.
*/
public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum)
throws Exception {
return startMiniZKCluster(setupClusterTestBuildDir(), zooKeeperServerNum);
} }
private MiniZooKeeperCluster startMiniZKCluster(final File dir) private MiniZooKeeperCluster startMiniZKCluster(final File dir)
throws Exception {
return startMiniZKCluster(dir,1);
}
private MiniZooKeeperCluster startMiniZKCluster(final File dir,
int zooKeeperServerNum)
throws Exception { throws Exception {
this.passedZkCluster = false; this.passedZkCluster = false;
if (this.zkCluster != null) { if (this.zkCluster != null) {
throw new IOException("Cluster already running at " + dir); throw new IOException("Cluster already running at " + dir);
} }
this.zkCluster = new MiniZooKeeperCluster(); this.zkCluster = new MiniZooKeeperCluster();
int clientPort = this.zkCluster.startup(dir); int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum);
this.conf.set("hbase.zookeeper.property.clientPort", this.conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort)); Integer.toString(clientPort));
return this.zkCluster; return this.zkCluster;

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -137,6 +138,47 @@ public class TestHBaseTestingUtility {
} }
} }
@Test public void testMiniZooKeeper() throws Exception {
MiniZooKeeperCluster cluster1 = this.hbt.startMiniZKCluster();
try {
assertEquals(0, cluster1.getBackupZooKeeperServerNum());
assertTrue((cluster1.killCurrentActiveZooKeeperServer() == -1));
} finally {
cluster1.shutdown();
}
this.hbt.shutdownMiniZKCluster();
// set up zookeeper cluster with 5 zk servers
MiniZooKeeperCluster cluster2 = this.hbt.startMiniZKCluster(5);
int defaultClientPort = 21818;
cluster2.setDefaultClientPort(defaultClientPort);
try {
assertEquals(4, cluster2.getBackupZooKeeperServerNum());
// killing the current active zk server
assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort));
assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort));
assertEquals(2, cluster2.getBackupZooKeeperServerNum());
assertEquals(3, cluster2.getZooKeeperServerNum());
// killing the backup zk servers
cluster2.killOneBackupZooKeeperServer();
cluster2.killOneBackupZooKeeperServer();
assertEquals(0, cluster2.getBackupZooKeeperServerNum());
assertEquals(1, cluster2.getZooKeeperServerNum());
// killing the last zk server
assertTrue((cluster2.killCurrentActiveZooKeeperServer() == -1));
// this should do nothing.
cluster2.killOneBackupZooKeeperServer();
assertEquals(-1, cluster2.getBackupZooKeeperServerNum());
assertEquals(0, cluster2.getZooKeeperServerNum());
} finally {
cluster2.shutdown();
}
}
@Test public void testMiniDFSCluster() throws Exception { @Test public void testMiniDFSCluster() throws Exception {
MiniDFSCluster cluster = this.hbt.startMiniDFSCluster(1); MiniDFSCluster cluster = this.hbt.startMiniDFSCluster(1);
FileSystem dfs = cluster.getFileSystem(); FileSystem dfs = cluster.getFileSystem();