Fix for https://issues.apache.org/jira/browse/AMQ-5082 ActiveMQ replicatedLevelDB cluster breaks, all nodes stop listening.

Many thanks to Jim Robinson (jim.robinson@gmail.com) for the patch!
This commit is contained in:
Christian Posta 2015-03-30 17:07:12 -07:00
parent 351d4b9dea
commit a39e51e051
4 changed files with 188 additions and 10 deletions

View File

@ -72,7 +72,9 @@ object ZooKeeperGroup {
*/ */
class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListener with ChangeListenerSupport { class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListener with ChangeListenerSupport {
val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1) var tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
var rebuildTree = false
val joins = HashMap[String, Int]() val joins = HashMap[String, Int]()
var members = new LinkedHashMap[String, Array[Byte]] var members = new LinkedHashMap[String, Array[Byte]]
@ -82,12 +84,13 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen
zk.registerListener(this) zk.registerListener(this)
create(root) create(root)
tree.track(new NodeEventsListener[Array[Byte]]() { var treeEventHandler = new NodeEventsListener[Array[Byte]]() {
def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = { def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = {
if( !closed ) if( !closed )
fire_cluster_change fire_cluster_change;
} }
}) }
tree.track(treeEventHandler)
fire_cluster_change fire_cluster_change
@volatile @volatile
@ -110,7 +113,21 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen
} }
def connected = zk.isConnected def connected = zk.isConnected
def onConnected() = fireConnected() def onConnected() = {
this.synchronized {
// underlying ZooKeeperTreeTracker isn't rebuilding itself after
// the loss of the session, so we need to destroy/rebuild it on
// reconnect.
if (rebuildTree) {
tree.destroy
tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
tree.track(treeEventHandler)
} else {
rebuildTree = true
}
}
fireConnected()
}
def onDisconnected() = { def onDisconnected() = {
this.members = new LinkedHashMap() this.members = new LinkedHashMap()
fireDisconnected() fireDisconnected()
@ -187,5 +204,4 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen
} }
} }
} }
} }

View File

@ -182,6 +182,67 @@ public class ElectingLevelDBStoreTest extends ZooKeeperTestSupport {
}); });
} }
/*
* testAMQ5082 tests the behavior of an ElectingLevelDBStore
* pool when ZooKeeper I/O timeouts occur. See issue AMQ-5082.
*/
@Test(timeout = 1000 * 60 * 5)
public void testAMQ5082() throws Throwable {
final ArrayList<ElectingLevelDBStore> stores = new ArrayList<ElectingLevelDBStore>();
LOG.info("Launching 3 stores");
for (String dir : new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"}) {
ElectingLevelDBStore store = createStoreNode();
store.setDirectory(new File(data_dir(), dir));
stores.add(store);
asyncStart(store);
}
LOG.info("Waiting 30s for stores to start");
Thread.sleep(30 * 1000);
LOG.info("Checking for a single master");
ElectingLevelDBStore master = null;
for (ElectingLevelDBStore store: stores) {
if (store.isMaster()) {
assertNull(master);
master = store;
}
}
assertNotNull(master);
LOG.info("Imposing 1s I/O wait on Zookeeper connections, waiting 30s to confirm that quorum is not lost");
this.connector.testHandle.setIOWaitMillis(1 * 1000, 30 * 1000);
LOG.info("Confirming that the quorum has not been lost");
for (ElectingLevelDBStore store: stores) {
if (store.isMaster()) {
assertTrue(master == store);
}
}
LOG.info("Imposing 11s I/O wait on Zookeeper connections, waiting 30s for quorum to be lost");
this.connector.testHandle.setIOWaitMillis(11 * 1000, 30 * 1000);
LOG.info("Confirming that the quorum has been lost");
for (ElectingLevelDBStore store: stores) {
assertFalse(store.isMaster());
}
master = null;
LOG.info("Lifting I/O wait on Zookeeper connections, waiting 30s for quorum to be re-established");
this.connector.testHandle.setIOWaitMillis(0, 30 * 1000);
LOG.info("Checking for a single master");
for (ElectingLevelDBStore store: stores) {
if (store.isMaster()) {
assertNull(master);
master = store;
}
}
assertNotNull(master);
}
@After @After
public void stop() throws Exception { public void stop() throws Exception {
if (master != null) { if (master != null) {

View File

@ -19,7 +19,7 @@ package org.apache.activemq.leveldb.test;
import org.apache.activemq.leveldb.CountDownFuture; import org.apache.activemq.leveldb.CountDownFuture;
import org.apache.activemq.leveldb.util.FileSupport; import org.apache.activemq.leveldb.util.FileSupport;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.TestServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.After; import org.junit.After;
@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class ZooKeeperTestSupport { public class ZooKeeperTestSupport {
protected NIOServerCnxnFactory connector; protected TestServerCnxnFactory connector;
static File data_dir() { static File data_dir() {
return new File("target/activemq-data/leveldb-elections"); return new File("target/activemq-data/leveldb-elections");
@ -49,7 +49,7 @@ public class ZooKeeperTestSupport {
ZooKeeperServer zk_server = new ZooKeeperServer(); ZooKeeperServer zk_server = new ZooKeeperServer();
zk_server.setTickTime(500); zk_server.setTickTime(500);
zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data"))); zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data")));
connector = new NIOServerCnxnFactory(); connector = new TestServerCnxnFactory();
connector.configure(new InetSocketAddress(0), 100); connector.configure(new InetSocketAddress(0), 100);
connector.startup(zk_server); connector.startup(zk_server);
System.out.println("ZooKeeper started"); System.out.println("ZooKeeper started");

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* TestServerCnxnFactory allows a caller to impose an artifical
* wait on I/O over the ServerCnxn used to communicate with the
* ZooKeeper server.
*/
public class TestServerCnxnFactory extends NIOServerCnxnFactory {
protected static final Logger LOG = LoggerFactory.getLogger(TestServerCnxnFactory.class);
/* testHandle controls whehter or not an artifical wait
* is imposed when talking to the ZooKeeper server
*/
public TestHandle testHandle = new TestHandle();
public TestServerCnxnFactory() throws IOException {
super();
}
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException {
return new TestServerCnxn(this.zkServer, sock, sk, this, testHandle);
}
/*
* TestHandle is handed to TestServerCnxn and is used to
* control the amount of time the TestServerCnxn waits
* before allowing an I/O operation.
*/
public class TestHandle {
private Object mu = new Object();
private int ioWaitMillis = 0;
/*
* Set an artifical I/O wait (in milliseconds) on ServerCnxn and
* then sleep for the specified number of milliseconds.
*/
public void setIOWaitMillis(int ioWaitMillis, int sleepMillis) {
synchronized(mu) {
this.ioWaitMillis = ioWaitMillis;
}
if (sleepMillis > 0) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {}
}
}
/*
* Get the number of milliseconds to wait before
* allowing ServerCnxn to perform I/O.
*/
public int getIOWaitMillis() {
synchronized(mu) {
return this.ioWaitMillis;
}
}
}
public class TestServerCnxn extends NIOServerCnxn {
public TestHandle testHandle;
public TestServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, TestHandle testHandle) throws IOException {
super(zk, sock, sk, factory);
this.testHandle = testHandle;
}
public void doIO(SelectionKey k) throws InterruptedException {
final int millis = this.testHandle.getIOWaitMillis();
if (millis > 0) {
LOG.info("imposing a "+millis+" millisecond wait on ServerCxn: "+this);
try {
Thread.sleep(millis);
} catch (InterruptedException e) {}
}
super.doIO(k);
}
}
}