HADOOP-8260. Replace ClientBaseWithFixes with our own modified copy of the class. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1310888 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
97804bacd0
commit
3646705f7a
|
@ -13,3 +13,5 @@ HADOOP-8215. Security support for ZK Failover controller (todd)
|
|||
HADOOP-8245. Fix flakiness in TestZKFailoverController (todd)
|
||||
|
||||
HADOOP-8257. TestZKFailoverControllerStress occasionally fails with Mockito error (todd)
|
||||
|
||||
HADOOP-8260. Replace ClientBaseWithFixes with our own modified copy of the class (todd)
|
||||
|
|
|
@ -15,50 +15,438 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ha;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.util.Set;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.zookeeper.test.ClientBase;
|
||||
import org.apache.zookeeper.test.JMXEnv;
|
||||
import org.apache.zookeeper.PortAssignment;
|
||||
import org.apache.zookeeper.TestableZooKeeper;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
||||
import org.apache.zookeeper.ZKTestCase;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.server.ServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
|
||||
import org.apache.zookeeper.server.ZKDatabase;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A subclass of ZK's ClientBase testing utility, with some fixes
|
||||
* necessary for running in the Hadoop context.
|
||||
* Copy-paste of ClientBase from ZooKeeper, but without any of the
|
||||
* JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
|
||||
* which causes spurious failures in the JMXEnv verification when
|
||||
* we run these tests with the upstream ClientBase.
|
||||
*/
|
||||
public class ClientBaseWithFixes extends ClientBase {
|
||||
public abstract class ClientBaseWithFixes extends ZKTestCase {
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(ClientBaseWithFixes.class);
|
||||
|
||||
/**
|
||||
* When running on the Jenkins setup, we need to ensure that this
|
||||
* build directory exists before running the tests.
|
||||
*/
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
// build.test.dir is used by zookeeper
|
||||
new File(System.getProperty("build.test.dir", "build")).mkdirs();
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
/**
|
||||
* ZK seems to have a bug when we muck with its sessions
|
||||
* behind its back, causing disconnects, etc. This bug
|
||||
* ends up leaving JMX beans around at the end of the test,
|
||||
* and ClientBase's teardown method will throw an exception
|
||||
* if it finds JMX beans leaked. So, clear them out there
|
||||
* to workaround the ZK bug. See ZOOKEEPER-1438.
|
||||
*/
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
Set<ObjectName> names = JMXEnv.ensureAll();
|
||||
for (ObjectName n : names) {
|
||||
try {
|
||||
JMXEnv.conn().unregisterMBean(n);
|
||||
} catch (Throwable t) {
|
||||
// ignore
|
||||
}
|
||||
public static int CONNECTION_TIMEOUT = 30000;
|
||||
static final File BASETEST =
|
||||
new File(System.getProperty("build.test.dir", "build"));
|
||||
|
||||
protected String hostPort = "127.0.0.1:" + PortAssignment.unique();
|
||||
protected int maxCnxns = 0;
|
||||
protected ServerCnxnFactory serverFactory = null;
|
||||
protected File tmpDir = null;
|
||||
|
||||
long initialFdCount;
|
||||
|
||||
public ClientBaseWithFixes() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* In general don't use this. Only use in the special case that you
|
||||
* want to ignore results (for whatever reason) in your test. Don't
|
||||
* use empty watchers in real code!
|
||||
*
|
||||
*/
|
||||
protected class NullWatcher implements Watcher {
|
||||
public void process(WatchedEvent event) { /* nada */ }
|
||||
}
|
||||
|
||||
protected static class CountdownWatcher implements Watcher {
|
||||
// XXX this doesn't need to be volatile! (Should probably be final)
|
||||
volatile CountDownLatch clientConnected;
|
||||
volatile boolean connected;
|
||||
|
||||
public CountdownWatcher() {
|
||||
reset();
|
||||
}
|
||||
synchronized public void reset() {
|
||||
clientConnected = new CountDownLatch(1);
|
||||
connected = false;
|
||||
}
|
||||
synchronized public void process(WatchedEvent event) {
|
||||
if (event.getState() == KeeperState.SyncConnected ||
|
||||
event.getState() == KeeperState.ConnectedReadOnly) {
|
||||
connected = true;
|
||||
notifyAll();
|
||||
clientConnected.countDown();
|
||||
} else {
|
||||
connected = false;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
synchronized boolean isConnected() {
|
||||
return connected;
|
||||
}
|
||||
synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
|
||||
long expire = System.currentTimeMillis() + timeout;
|
||||
long left = timeout;
|
||||
while(!connected && left > 0) {
|
||||
wait(left);
|
||||
left = expire - System.currentTimeMillis();
|
||||
}
|
||||
if (!connected) {
|
||||
throw new TimeoutException("Did not connect");
|
||||
|
||||
}
|
||||
}
|
||||
synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
|
||||
long expire = System.currentTimeMillis() + timeout;
|
||||
long left = timeout;
|
||||
while(connected && left > 0) {
|
||||
wait(left);
|
||||
left = expire - System.currentTimeMillis();
|
||||
}
|
||||
if (connected) {
|
||||
throw new TimeoutException("Did not disconnect");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient()
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
return createClient(hostPort);
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient(String hp)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
CountdownWatcher watcher = new CountdownWatcher();
|
||||
return createClient(watcher, hp);
|
||||
}
|
||||
|
||||
private LinkedList<ZooKeeper> allClients;
|
||||
private boolean allClientsSetup = false;
|
||||
|
||||
protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
return createClient(watcher, hp, CONNECTION_TIMEOUT);
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient(CountdownWatcher watcher,
|
||||
String hp, int timeout)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
watcher.reset();
|
||||
TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
|
||||
if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
|
||||
{
|
||||
Assert.fail("Unable to connect to server");
|
||||
}
|
||||
synchronized(this) {
|
||||
if (!allClientsSetup) {
|
||||
LOG.error("allClients never setup");
|
||||
Assert.fail("allClients never setup");
|
||||
}
|
||||
if (allClients != null) {
|
||||
allClients.add(zk);
|
||||
} else {
|
||||
// test done - close the zk, not needed
|
||||
zk.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return zk;
|
||||
}
|
||||
|
||||
public static class HostPort {
|
||||
String host;
|
||||
int port;
|
||||
public HostPort(String host, int port) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
}
|
||||
public static List<HostPort> parseHostPortList(String hplist) {
|
||||
ArrayList<HostPort> alist = new ArrayList<HostPort>();
|
||||
for (String hp: hplist.split(",")) {
|
||||
int idx = hp.lastIndexOf(':');
|
||||
String host = hp.substring(0, idx);
|
||||
int port;
|
||||
try {
|
||||
port = Integer.parseInt(hp.substring(idx + 1));
|
||||
} catch(RuntimeException e) {
|
||||
throw new RuntimeException("Problem parsing " + hp + e.toString());
|
||||
}
|
||||
alist.add(new HostPort(host,port));
|
||||
}
|
||||
return alist;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the 4letterword
|
||||
* @param host the destination host
|
||||
* @param port the destination port
|
||||
* @param cmd the 4letterword
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String send4LetterWord(String host, int port, String cmd)
|
||||
throws IOException
|
||||
{
|
||||
LOG.info("connecting to " + host + " " + port);
|
||||
Socket sock = new Socket(host, port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write(cmd.getBytes());
|
||||
outstream.flush();
|
||||
// this replicates NC - close the output stream before reading
|
||||
sock.shutdownOutput();
|
||||
|
||||
reader =
|
||||
new BufferedReader(
|
||||
new InputStreamReader(sock.getInputStream()));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String line;
|
||||
while((line = reader.readLine()) != null) {
|
||||
sb.append(line + "\n");
|
||||
}
|
||||
return sb.toString();
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean waitForServerUp(String hp, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
// if there are multiple hostports, just take the first one
|
||||
HostPort hpobj = parseHostPortList(hp).get(0);
|
||||
String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
|
||||
if (result.startsWith("Zookeeper version:") &&
|
||||
!result.contains("READ-ONLY")) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server " + hp + " not up " + e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
public static boolean waitForServerDown(String hp, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
HostPort hpobj = parseHostPortList(hp).get(0);
|
||||
send4LetterWord(hpobj.host, hpobj.port, "stat");
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static File createTmpDir() throws IOException {
|
||||
return createTmpDir(BASETEST);
|
||||
}
|
||||
static File createTmpDir(File parentDir) throws IOException {
|
||||
File tmpFile = File.createTempFile("test", ".junit", parentDir);
|
||||
// don't delete tmpFile - this ensures we don't attempt to create
|
||||
// a tmpDir with a duplicate name
|
||||
File tmpDir = new File(tmpFile + ".dir");
|
||||
Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
|
||||
Assert.assertTrue(tmpDir.mkdirs());
|
||||
|
||||
return tmpDir;
|
||||
}
|
||||
private static int getPort(String hostPort) {
|
||||
String[] split = hostPort.split(":");
|
||||
String portstr = split[split.length-1];
|
||||
String[] pc = portstr.split("/");
|
||||
if (pc.length > 1) {
|
||||
portstr = pc[0];
|
||||
}
|
||||
return Integer.parseInt(portstr);
|
||||
}
|
||||
|
||||
static ServerCnxnFactory createNewServerInstance(File dataDir,
|
||||
ServerCnxnFactory factory, String hostPort, int maxCnxns)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
|
||||
final int PORT = getPort(hostPort);
|
||||
if (factory == null) {
|
||||
factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
|
||||
}
|
||||
factory.startup(zks);
|
||||
Assert.assertTrue("waiting for server up",
|
||||
ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
|
||||
CONNECTION_TIMEOUT));
|
||||
|
||||
return factory;
|
||||
}
|
||||
|
||||
static void shutdownServerInstance(ServerCnxnFactory factory,
|
||||
String hostPort)
|
||||
{
|
||||
if (factory != null) {
|
||||
ZKDatabase zkDb;
|
||||
{
|
||||
ZooKeeperServer zs = getServer(factory);
|
||||
|
||||
zkDb = zs.getZKDatabase();
|
||||
}
|
||||
factory.shutdown();
|
||||
try {
|
||||
zkDb.close();
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Error closing logs ", ie);
|
||||
}
|
||||
final int PORT = getPort(hostPort);
|
||||
|
||||
Assert.assertTrue("waiting for server down",
|
||||
ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
|
||||
CONNECTION_TIMEOUT));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test specific setup
|
||||
*/
|
||||
public static void setupTestEnv() {
|
||||
// during the tests we run with 100K prealloc in the logs.
|
||||
// on windows systems prealloc of 64M was seen to take ~15seconds
|
||||
// resulting in test Assert.failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100 * 1024);
|
||||
}
|
||||
|
||||
protected void setUpAll() throws Exception {
|
||||
allClients = new LinkedList<ZooKeeper>();
|
||||
allClientsSetup = true;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
BASETEST.mkdirs();
|
||||
|
||||
setupTestEnv();
|
||||
|
||||
setUpAll();
|
||||
|
||||
tmpDir = createTmpDir(BASETEST);
|
||||
|
||||
startServer();
|
||||
|
||||
LOG.info("Client test setup finished");
|
||||
}
|
||||
|
||||
protected void startServer() throws Exception {
|
||||
LOG.info("STARTING server");
|
||||
serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns);
|
||||
}
|
||||
|
||||
protected void stopServer() throws Exception {
|
||||
LOG.info("STOPPING server");
|
||||
shutdownServerInstance(serverFactory, hostPort);
|
||||
serverFactory = null;
|
||||
}
|
||||
|
||||
|
||||
protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
|
||||
ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac);
|
||||
|
||||
return zs;
|
||||
}
|
||||
|
||||
protected void tearDownAll() throws Exception {
|
||||
synchronized (this) {
|
||||
if (allClients != null) for (ZooKeeper zk : allClients) {
|
||||
try {
|
||||
if (zk != null)
|
||||
zk.close();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("ignoring interrupt", e);
|
||||
}
|
||||
}
|
||||
allClients = null;
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
LOG.info("tearDown starting");
|
||||
|
||||
tearDownAll();
|
||||
|
||||
stopServer();
|
||||
|
||||
if (tmpDir != null) {
|
||||
Assert.assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
|
||||
}
|
||||
|
||||
// This has to be set to null when the same instance of this class is reused between test cases
|
||||
serverFactory = null;
|
||||
}
|
||||
|
||||
public static boolean recursiveDelete(File d) {
|
||||
if (d.isDirectory()) {
|
||||
File children[] = d.listFiles();
|
||||
for (File f : children) {
|
||||
Assert.assertTrue("delete " + f.toString(), recursiveDelete(f));
|
||||
}
|
||||
}
|
||||
return d.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue