HADOOP-18515. Backport HADOOP-17612 to branch-3.3(Upgrade Zookeeper to 3.6.3 and Curator to 5.2.0) (#5097)
* HADOOP-17612. Upgrade Zookeeper to 3.6.3 and Curator to 5.2.0 (#3241) Signed-off-by: Akira Ajisaka <aajisaka@apache.org> Co-authored-by: Viraj Jasani <vjasani@apache.org> Co-authored-by: Melissa You <myou@myou-mn1.linkedin.biz>
This commit is contained in:
parent
7b84f6458b
commit
853ffb545a
|
@ -316,9 +316,9 @@ org.apache.commons:commons-lang3:3.12.0
|
|||
org.apache.commons:commons-math3:3.1.1
|
||||
org.apache.commons:commons-text:1.10.0
|
||||
org.apache.commons:commons-validator:1.6
|
||||
org.apache.curator:curator-client:4.2.0
|
||||
org.apache.curator:curator-framework:4.2.0
|
||||
org.apache.curator:curator-recipes:4.2.0
|
||||
org.apache.curator:curator-client:5.2.0
|
||||
org.apache.curator:curator-framework:5.2.0
|
||||
org.apache.curator:curator-recipes:5.2.0
|
||||
org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1
|
||||
org.apache.hbase:hbase-annotations:1.4.8
|
||||
org.apache.hbase:hbase-client:1.4.8
|
||||
|
@ -345,8 +345,6 @@ org.apache.kerby:kerby-util:1.0.1
|
|||
org.apache.kerby:kerby-xdr:1.0.1
|
||||
org.apache.kerby:token-provider:1.0.1
|
||||
org.apache.yetus:audience-annotations:0.5.0
|
||||
org.apache.zookeeper:zookeeper:3.5.6
|
||||
org.apache.zookeeper:zookeeper-jute:3.5.6
|
||||
org.codehaus.jettison:jettison:1.5.1
|
||||
org.eclipse.jetty:jetty-annotations:9.4.48.v20220622
|
||||
org.eclipse.jetty:jetty-http:9.4.48.v20220622
|
||||
|
@ -362,6 +360,7 @@ org.eclipse.jetty:jetty-webapp:9.4.48.v20220622
|
|||
org.eclipse.jetty:jetty-xml:9.4.48.v20220622
|
||||
org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.48.v20220622
|
||||
org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.48.v20220622
|
||||
org.apache.zookeeper:zookeeper:3.6.3
|
||||
org.ehcache:ehcache:3.3.1
|
||||
org.lz4:lz4-java:1.7.1
|
||||
org.objenesis:objenesis:2.6
|
||||
|
|
|
@ -128,6 +128,15 @@
|
|||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-framework</artifactId>
|
||||
|
|
|
@ -336,6 +336,10 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|||
import org.apache.zookeeper.ZKTestCase;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.server.ServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
|
||||
import org.apache.zookeeper.server.ZKDatabase;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
|
@ -60,10 +59,10 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
|
|||
* we run these tests with the upstream ClientBase.
|
||||
*/
|
||||
public abstract class ClientBaseWithFixes extends ZKTestCase {
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(ClientBaseWithFixes.class);
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(ClientBaseWithFixes.class);
|
||||
|
||||
public static int CONNECTION_TIMEOUT = 30000;
|
||||
static final File BASETEST = GenericTestUtils.getTestDir();
|
||||
public static int CONNECTION_TIMEOUT = 30000;
|
||||
static final File BASETEST = GenericTestUtils.getTestDir();
|
||||
|
||||
static {
|
||||
// The 4-letter-words commands are simple diagnostics telnet commands in
|
||||
|
@ -74,411 +73,409 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
|
|||
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
|
||||
}
|
||||
|
||||
protected final String hostPort = initHostPort();
|
||||
protected int maxCnxns = 0;
|
||||
protected ServerCnxnFactory serverFactory = null;
|
||||
protected File tmpDir = null;
|
||||
protected final String hostPort = initHostPort();
|
||||
protected int maxCnxns = 0;
|
||||
protected ServerCnxnFactory serverFactory = null;
|
||||
protected File tmpDir = null;
|
||||
|
||||
long initialFdCount;
|
||||
|
||||
long initialFdCount;
|
||||
|
||||
/**
|
||||
* In general don't use this. Only use in the special case that you
|
||||
* want to ignore results (for whatever reason) in your test. Don't
|
||||
* use empty watchers in real code!
|
||||
*
|
||||
*/
|
||||
protected class NullWatcher implements Watcher {
|
||||
@Override
|
||||
public void process(WatchedEvent event) { /* nada */ }
|
||||
/**
|
||||
* In general don't use this. Only use in the special case that you
|
||||
* want to ignore results (for whatever reason) in your test. Don't
|
||||
* use empty watchers in real code!
|
||||
*
|
||||
*/
|
||||
protected class NullWatcher implements Watcher {
|
||||
@Override
|
||||
public void process(WatchedEvent event) { /* nada */ }
|
||||
}
|
||||
|
||||
protected static class CountdownWatcher implements Watcher {
|
||||
// XXX this doesn't need to be volatile! (Should probably be final)
|
||||
volatile CountDownLatch clientConnected;
|
||||
volatile boolean connected;
|
||||
protected ZooKeeper client;
|
||||
|
||||
public void initializeWatchedClient(ZooKeeper zk) {
|
||||
if (client != null) {
|
||||
throw new RuntimeException("Watched Client was already set");
|
||||
}
|
||||
client = zk;
|
||||
}
|
||||
|
||||
protected static class CountdownWatcher implements Watcher {
|
||||
// XXX this doesn't need to be volatile! (Should probably be final)
|
||||
volatile CountDownLatch clientConnected;
|
||||
volatile boolean connected;
|
||||
protected ZooKeeper client;
|
||||
|
||||
public void initializeWatchedClient(ZooKeeper zk) {
|
||||
if (client != null) {
|
||||
throw new RuntimeException("Watched Client was already set");
|
||||
}
|
||||
client = zk;
|
||||
}
|
||||
|
||||
public CountdownWatcher() {
|
||||
reset();
|
||||
}
|
||||
synchronized public void reset() {
|
||||
clientConnected = new CountDownLatch(1);
|
||||
connected = false;
|
||||
}
|
||||
@Override
|
||||
synchronized public void process(WatchedEvent event) {
|
||||
if (event.getState() == KeeperState.SyncConnected ||
|
||||
event.getState() == KeeperState.ConnectedReadOnly) {
|
||||
connected = true;
|
||||
notifyAll();
|
||||
clientConnected.countDown();
|
||||
} else {
|
||||
connected = false;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
synchronized boolean isConnected() {
|
||||
return connected;
|
||||
}
|
||||
@VisibleForTesting
|
||||
public synchronized void waitForConnected(long timeout)
|
||||
throws InterruptedException, TimeoutException {
|
||||
long expire = Time.now() + timeout;
|
||||
long left = timeout;
|
||||
while(!connected && left > 0) {
|
||||
wait(left);
|
||||
left = expire - Time.now();
|
||||
}
|
||||
if (!connected) {
|
||||
throw new TimeoutException("Did not connect");
|
||||
|
||||
}
|
||||
}
|
||||
@VisibleForTesting
|
||||
public synchronized void waitForDisconnected(long timeout)
|
||||
throws InterruptedException, TimeoutException {
|
||||
long expire = Time.now() + timeout;
|
||||
long left = timeout;
|
||||
while(connected && left > 0) {
|
||||
wait(left);
|
||||
left = expire - Time.now();
|
||||
}
|
||||
if (connected) {
|
||||
throw new TimeoutException("Did not disconnect");
|
||||
|
||||
}
|
||||
}
|
||||
public CountdownWatcher() {
|
||||
reset();
|
||||
}
|
||||
synchronized public void reset() {
|
||||
clientConnected = new CountDownLatch(1);
|
||||
connected = false;
|
||||
}
|
||||
@Override
|
||||
synchronized public void process(WatchedEvent event) {
|
||||
if (event.getState() == KeeperState.SyncConnected ||
|
||||
event.getState() == KeeperState.ConnectedReadOnly) {
|
||||
connected = true;
|
||||
notifyAll();
|
||||
clientConnected.countDown();
|
||||
} else {
|
||||
connected = false;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
synchronized boolean isConnected() {
|
||||
return connected;
|
||||
}
|
||||
@VisibleForTesting
|
||||
public synchronized void waitForConnected(long timeout)
|
||||
throws InterruptedException, TimeoutException {
|
||||
long expire = Time.now() + timeout;
|
||||
long left = timeout;
|
||||
while(!connected && left > 0) {
|
||||
wait(left);
|
||||
left = expire - Time.now();
|
||||
}
|
||||
if (!connected) {
|
||||
throw new TimeoutException("Did not connect");
|
||||
|
||||
protected TestableZooKeeper createClient()
|
||||
throws IOException, InterruptedException
|
||||
}
|
||||
}
|
||||
@VisibleForTesting
|
||||
public synchronized void waitForDisconnected(long timeout)
|
||||
throws InterruptedException, TimeoutException {
|
||||
long expire = Time.now() + timeout;
|
||||
long left = timeout;
|
||||
while(connected && left > 0) {
|
||||
wait(left);
|
||||
left = expire - Time.now();
|
||||
}
|
||||
if (connected) {
|
||||
throw new TimeoutException("Did not disconnect");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient()
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
return createClient(hostPort);
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient(String hp)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
CountdownWatcher watcher = new CountdownWatcher();
|
||||
return createClient(watcher, hp);
|
||||
}
|
||||
|
||||
private LinkedList<ZooKeeper> allClients;
|
||||
private boolean allClientsSetup = false;
|
||||
|
||||
protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
return createClient(watcher, hp, CONNECTION_TIMEOUT);
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient(CountdownWatcher watcher,
|
||||
String hp, int timeout)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
watcher.reset();
|
||||
TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
|
||||
if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
|
||||
{
|
||||
return createClient(hostPort);
|
||||
Assert.fail("Unable to connect to server");
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient(String hp)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
CountdownWatcher watcher = new CountdownWatcher();
|
||||
return createClient(watcher, hp);
|
||||
synchronized(this) {
|
||||
if (!allClientsSetup) {
|
||||
LOG.error("allClients never setup");
|
||||
Assert.fail("allClients never setup");
|
||||
}
|
||||
if (allClients != null) {
|
||||
allClients.add(zk);
|
||||
} else {
|
||||
// test done - close the zk, not needed
|
||||
zk.close();
|
||||
}
|
||||
}
|
||||
watcher.initializeWatchedClient(zk);
|
||||
return zk;
|
||||
}
|
||||
|
||||
private LinkedList<ZooKeeper> allClients;
|
||||
private boolean allClientsSetup = false;
|
||||
|
||||
protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
return createClient(watcher, hp, CONNECTION_TIMEOUT);
|
||||
public static class HostPort {
|
||||
String host;
|
||||
int port;
|
||||
public HostPort(String host, int port) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
protected TestableZooKeeper createClient(CountdownWatcher watcher,
|
||||
String hp, int timeout)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
watcher.reset();
|
||||
TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
|
||||
if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
|
||||
{
|
||||
Assert.fail("Unable to connect to server");
|
||||
}
|
||||
synchronized(this) {
|
||||
if (!allClientsSetup) {
|
||||
LOG.error("allClients never setup");
|
||||
Assert.fail("allClients never setup");
|
||||
}
|
||||
if (allClients != null) {
|
||||
allClients.add(zk);
|
||||
} else {
|
||||
// test done - close the zk, not needed
|
||||
zk.close();
|
||||
}
|
||||
}
|
||||
watcher.initializeWatchedClient(zk);
|
||||
return zk;
|
||||
}
|
||||
public static List<HostPort> parseHostPortList(String hplist) {
|
||||
ArrayList<HostPort> alist = new ArrayList<HostPort>();
|
||||
for (String hp: hplist.split(",")) {
|
||||
int idx = hp.lastIndexOf(':');
|
||||
String host = hp.substring(0, idx);
|
||||
int port;
|
||||
try {
|
||||
port = Integer.parseInt(hp.substring(idx + 1));
|
||||
} catch(RuntimeException e) {
|
||||
throw new RuntimeException("Problem parsing " + hp + e.toString());
|
||||
}
|
||||
alist.add(new HostPort(host,port));
|
||||
}
|
||||
return alist;
|
||||
}
|
||||
|
||||
public static class HostPort {
|
||||
String host;
|
||||
int port;
|
||||
public HostPort(String host, int port) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
/**
|
||||
* Send the 4letterword
|
||||
* @param host the destination host
|
||||
* @param port the destination port
|
||||
* @param cmd the 4letterword
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String send4LetterWord(String host, int port, String cmd)
|
||||
throws IOException
|
||||
{
|
||||
LOG.info("connecting to " + host + " " + port);
|
||||
Socket sock = new Socket(host, port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write(cmd.getBytes());
|
||||
outstream.flush();
|
||||
// this replicates NC - close the output stream before reading
|
||||
sock.shutdownOutput();
|
||||
|
||||
reader =
|
||||
new BufferedReader(
|
||||
new InputStreamReader(sock.getInputStream()));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String line;
|
||||
while((line = reader.readLine()) != null) {
|
||||
sb.append(line + "\n");
|
||||
}
|
||||
return sb.toString();
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
public static List<HostPort> parseHostPortList(String hplist) {
|
||||
ArrayList<HostPort> alist = new ArrayList<HostPort>();
|
||||
for (String hp: hplist.split(",")) {
|
||||
int idx = hp.lastIndexOf(':');
|
||||
String host = hp.substring(0, idx);
|
||||
int port;
|
||||
try {
|
||||
port = Integer.parseInt(hp.substring(idx + 1));
|
||||
} catch(RuntimeException e) {
|
||||
throw new RuntimeException("Problem parsing " + hp + e.toString());
|
||||
}
|
||||
alist.add(new HostPort(host,port));
|
||||
}
|
||||
return alist;
|
||||
}
|
||||
|
||||
public static boolean waitForServerUp(String hp, long timeout) {
|
||||
long start = Time.now();
|
||||
while (true) {
|
||||
try {
|
||||
// if there are multiple hostports, just take the first one
|
||||
HostPort hpobj = parseHostPortList(hp).get(0);
|
||||
String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
|
||||
if (result.startsWith("Zookeeper version:") &&
|
||||
!result.contains("READ-ONLY")) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server " + hp + " not up " + e);
|
||||
}
|
||||
|
||||
if (Time.now() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
public static boolean waitForServerDown(String hp, long timeout) {
|
||||
long start = Time.now();
|
||||
while (true) {
|
||||
try {
|
||||
HostPort hpobj = parseHostPortList(hp).get(0);
|
||||
send4LetterWord(hpobj.host, hpobj.port, "stat");
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the 4letterword
|
||||
* @param host the destination host
|
||||
* @param port the destination port
|
||||
* @param cmd the 4letterword
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String send4LetterWord(String host, int port, String cmd)
|
||||
throws IOException
|
||||
{
|
||||
LOG.info("connecting to " + host + " " + port);
|
||||
Socket sock = new Socket(host, port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write(cmd.getBytes());
|
||||
outstream.flush();
|
||||
// this replicates NC - close the output stream before reading
|
||||
sock.shutdownOutput();
|
||||
|
||||
reader =
|
||||
new BufferedReader(
|
||||
new InputStreamReader(sock.getInputStream()));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String line;
|
||||
while((line = reader.readLine()) != null) {
|
||||
sb.append(line + "\n");
|
||||
}
|
||||
return sb.toString();
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
if (Time.now() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static boolean waitForServerUp(String hp, long timeout) {
|
||||
long start = Time.now();
|
||||
while (true) {
|
||||
try {
|
||||
// if there are multiple hostports, just take the first one
|
||||
HostPort hpobj = parseHostPortList(hp).get(0);
|
||||
String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
|
||||
if (result.startsWith("Zookeeper version:") &&
|
||||
!result.contains("READ-ONLY")) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server " + hp + " not up " + e);
|
||||
}
|
||||
public static File createTmpDir() throws IOException {
|
||||
return createTmpDir(BASETEST);
|
||||
}
|
||||
static File createTmpDir(File parentDir) throws IOException {
|
||||
File tmpFile = File.createTempFile("test", ".junit", parentDir);
|
||||
// don't delete tmpFile - this ensures we don't attempt to create
|
||||
// a tmpDir with a duplicate name
|
||||
File tmpDir = new File(tmpFile + ".dir");
|
||||
Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
|
||||
Assert.assertTrue(tmpDir.mkdirs());
|
||||
|
||||
if (Time.now() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return tmpDir;
|
||||
}
|
||||
|
||||
private static int getPort(String hostPort) {
|
||||
String[] split = hostPort.split(":");
|
||||
String portstr = split[split.length-1];
|
||||
String[] pc = portstr.split("/");
|
||||
if (pc.length > 1) {
|
||||
portstr = pc[0];
|
||||
}
|
||||
public static boolean waitForServerDown(String hp, long timeout) {
|
||||
long start = Time.now();
|
||||
while (true) {
|
||||
try {
|
||||
HostPort hpobj = parseHostPortList(hp).get(0);
|
||||
send4LetterWord(hpobj.host, hpobj.port, "stat");
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
return Integer.parseInt(portstr);
|
||||
}
|
||||
|
||||
if (Time.now() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
static ServerCnxnFactory createNewServerInstance(File dataDir,
|
||||
ServerCnxnFactory factory, String hostPort, int maxCnxns)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
|
||||
final int PORT = getPort(hostPort);
|
||||
if (factory == null) {
|
||||
factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
|
||||
}
|
||||
factory.startup(zks);
|
||||
Assert.assertTrue("waiting for server up",
|
||||
ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
|
||||
CONNECTION_TIMEOUT));
|
||||
|
||||
public static File createTmpDir() throws IOException {
|
||||
return createTmpDir(BASETEST);
|
||||
}
|
||||
static File createTmpDir(File parentDir) throws IOException {
|
||||
File tmpFile = File.createTempFile("test", ".junit", parentDir);
|
||||
// don't delete tmpFile - this ensures we don't attempt to create
|
||||
// a tmpDir with a duplicate name
|
||||
File tmpDir = new File(tmpFile + ".dir");
|
||||
Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
|
||||
Assert.assertTrue(tmpDir.mkdirs());
|
||||
return factory;
|
||||
}
|
||||
|
||||
return tmpDir;
|
||||
}
|
||||
static void shutdownServerInstance(ServerCnxnFactory factory,
|
||||
String hostPort)
|
||||
{
|
||||
if (factory != null) {
|
||||
ZKDatabase zkDb;
|
||||
{
|
||||
ZooKeeperServer zs = getServer(factory);
|
||||
|
||||
private static int getPort(String hostPort) {
|
||||
String[] split = hostPort.split(":");
|
||||
String portstr = split[split.length-1];
|
||||
String[] pc = portstr.split("/");
|
||||
if (pc.length > 1) {
|
||||
portstr = pc[0];
|
||||
}
|
||||
return Integer.parseInt(portstr);
|
||||
}
|
||||
zkDb = zs.getZKDatabase();
|
||||
}
|
||||
factory.shutdown();
|
||||
try {
|
||||
zkDb.close();
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Error closing logs ", ie);
|
||||
}
|
||||
final int PORT = getPort(hostPort);
|
||||
|
||||
static ServerCnxnFactory createNewServerInstance(File dataDir,
|
||||
ServerCnxnFactory factory, String hostPort, int maxCnxns)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
|
||||
final int PORT = getPort(hostPort);
|
||||
if (factory == null) {
|
||||
factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
|
||||
}
|
||||
factory.startup(zks);
|
||||
Assert.assertTrue("waiting for server up",
|
||||
ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
|
||||
Assert.assertTrue("waiting for server down",
|
||||
ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
|
||||
CONNECTION_TIMEOUT));
|
||||
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
|
||||
static void shutdownServerInstance(ServerCnxnFactory factory,
|
||||
String hostPort)
|
||||
{
|
||||
if (factory != null) {
|
||||
ZKDatabase zkDb;
|
||||
{
|
||||
ZooKeeperServer zs = getServer(factory);
|
||||
|
||||
zkDb = zs.getZKDatabase();
|
||||
}
|
||||
factory.shutdown();
|
||||
try {
|
||||
zkDb.close();
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Error closing logs ", ie);
|
||||
}
|
||||
final int PORT = getPort(hostPort);
|
||||
/**
|
||||
* Test specific setup
|
||||
*/
|
||||
public static void setupTestEnv() {
|
||||
// during the tests we run with 100K prealloc in the logs.
|
||||
// on windows systems prealloc of 64M was seen to take ~15seconds
|
||||
// resulting in test Assert.failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100 * 1024);
|
||||
}
|
||||
|
||||
Assert.assertTrue("waiting for server down",
|
||||
ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
|
||||
CONNECTION_TIMEOUT));
|
||||
}
|
||||
protected void setUpAll() throws Exception {
|
||||
allClients = new LinkedList<ZooKeeper>();
|
||||
allClientsSetup = true;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
BASETEST.mkdirs();
|
||||
|
||||
setupTestEnv();
|
||||
|
||||
setUpAll();
|
||||
|
||||
tmpDir = createTmpDir(BASETEST);
|
||||
|
||||
startServer();
|
||||
|
||||
LOG.info("Client test setup finished");
|
||||
}
|
||||
|
||||
private String initHostPort() {
|
||||
BASETEST.mkdirs();
|
||||
int port = 0;
|
||||
try {
|
||||
port = ServerSocketUtil.getPort(port, 100);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return "127.0.0.1:" + port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test specific setup
|
||||
*/
|
||||
public static void setupTestEnv() {
|
||||
// during the tests we run with 100K prealloc in the logs.
|
||||
// on windows systems prealloc of 64M was seen to take ~15seconds
|
||||
// resulting in test Assert.failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100 * 1024);
|
||||
}
|
||||
protected void startServer() throws Exception {
|
||||
LOG.info("STARTING server");
|
||||
serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns);
|
||||
}
|
||||
|
||||
protected void setUpAll() throws Exception {
|
||||
allClients = new LinkedList<ZooKeeper>();
|
||||
allClientsSetup = true;
|
||||
}
|
||||
protected void stopServer() throws Exception {
|
||||
LOG.info("STOPPING server");
|
||||
shutdownServerInstance(serverFactory, hostPort);
|
||||
serverFactory = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
BASETEST.mkdirs();
|
||||
|
||||
setupTestEnv();
|
||||
protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
|
||||
return fac.getZooKeeperServer();
|
||||
}
|
||||
|
||||
setUpAll();
|
||||
|
||||
tmpDir = createTmpDir(BASETEST);
|
||||
|
||||
startServer();
|
||||
|
||||
LOG.info("Client test setup finished");
|
||||
}
|
||||
|
||||
private String initHostPort() {
|
||||
BASETEST.mkdirs();
|
||||
int port = 0;
|
||||
protected void tearDownAll() throws Exception {
|
||||
synchronized (this) {
|
||||
if (allClients != null) for (ZooKeeper zk : allClients) {
|
||||
try {
|
||||
port = ServerSocketUtil.getPort(port, 100);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
if (zk != null)
|
||||
zk.close();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("ignoring interrupt", e);
|
||||
}
|
||||
return "127.0.0.1:" + port;
|
||||
}
|
||||
allClients = null;
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
LOG.info("tearDown starting");
|
||||
|
||||
tearDownAll();
|
||||
|
||||
stopServer();
|
||||
|
||||
if (tmpDir != null) {
|
||||
Assert.assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
|
||||
}
|
||||
|
||||
protected void startServer() throws Exception {
|
||||
LOG.info("STARTING server");
|
||||
serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns);
|
||||
}
|
||||
|
||||
protected void stopServer() throws Exception {
|
||||
LOG.info("STOPPING server");
|
||||
shutdownServerInstance(serverFactory, hostPort);
|
||||
serverFactory = null;
|
||||
}
|
||||
|
||||
|
||||
protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
|
||||
ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac);
|
||||
|
||||
return zs;
|
||||
}
|
||||
|
||||
protected void tearDownAll() throws Exception {
|
||||
synchronized (this) {
|
||||
if (allClients != null) for (ZooKeeper zk : allClients) {
|
||||
try {
|
||||
if (zk != null)
|
||||
zk.close();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("ignoring interrupt", e);
|
||||
}
|
||||
}
|
||||
allClients = null;
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
LOG.info("tearDown starting");
|
||||
|
||||
tearDownAll();
|
||||
|
||||
stopServer();
|
||||
|
||||
if (tmpDir != null) {
|
||||
Assert.assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
|
||||
}
|
||||
|
||||
// This has to be set to null when the same instance of this class is reused between test cases
|
||||
serverFactory = null;
|
||||
}
|
||||
|
||||
public static boolean recursiveDelete(File d) {
|
||||
if (d.isDirectory()) {
|
||||
File children[] = d.listFiles();
|
||||
for (File f : children) {
|
||||
Assert.assertTrue("delete " + f.toString(), recursiveDelete(f));
|
||||
}
|
||||
}
|
||||
return d.delete();
|
||||
// This has to be set to null when the same instance of this class is reused between test cases
|
||||
serverFactory = null;
|
||||
}
|
||||
|
||||
public static boolean recursiveDelete(File d) {
|
||||
if (d.isDirectory()) {
|
||||
File children[] = d.listFiles();
|
||||
for (File f : children) {
|
||||
Assert.assertTrue("delete " + f.toString(), recursiveDelete(f));
|
||||
}
|
||||
}
|
||||
return d.delete();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Random;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.zookeeper.server.ServerCnxn;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -131,7 +132,7 @@ public class TestZKFailoverControllerStress extends ClientBaseWithFixes {
|
|||
long st = Time.now();
|
||||
while (Time.now() - st < runFor) {
|
||||
cluster.getTestContext().checkException();
|
||||
serverFactory.closeAll();
|
||||
serverFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
|
||||
Thread.sleep(50);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,6 +135,17 @@
|
|||
<artifactId>dnsjava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -229,7 +229,7 @@ public class MicroZookeeperService
|
|||
setupSecurity();
|
||||
|
||||
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
|
||||
ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime);
|
||||
ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime, "");
|
||||
|
||||
LOG.info("Starting Local Zookeeper service");
|
||||
factory = ServerCnxnFactory.createFactory();
|
||||
|
|
|
@ -83,6 +83,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop.thirdparty</groupId>
|
||||
<artifactId>hadoop-shaded-guava</artifactId>
|
||||
|
|
|
@ -62,6 +62,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop.thirdparty</groupId>
|
||||
<artifactId>hadoop-shaded-guava</artifactId>
|
||||
|
|
|
@ -99,8 +99,8 @@
|
|||
<hadoop-thirdparty-shaded-protobuf-prefix>${hadoop-thirdparty-shaded-prefix}.protobuf</hadoop-thirdparty-shaded-protobuf-prefix>
|
||||
<hadoop-thirdparty-shaded-guava-prefix>${hadoop-thirdparty-shaded-prefix}.com.google.common</hadoop-thirdparty-shaded-guava-prefix>
|
||||
|
||||
<zookeeper.version>3.5.6</zookeeper.version>
|
||||
<curator.version>4.2.0</curator.version>
|
||||
<zookeeper.version>3.6.3</zookeeper.version>
|
||||
<curator.version>5.2.0</curator.version>
|
||||
<findbugs.version>3.0.5</findbugs.version>
|
||||
<dnsjava.version>2.1.7</dnsjava.version>
|
||||
|
||||
|
|
|
@ -104,6 +104,17 @@
|
|||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
|
@ -89,6 +89,16 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
|
|
@ -109,6 +109,15 @@
|
|||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${leveldbjni.group}</groupId>
|
||||
<artifactId>leveldbjni-all</artifactId>
|
||||
|
|
|
@ -217,6 +217,15 @@
|
|||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${leveldbjni.group}</groupId>
|
||||
<artifactId>leveldbjni-all</artifactId>
|
||||
|
|
Loading…
Reference in New Issue