From a39e51e0519852332f7779443c3db09b6e691d49 Mon Sep 17 00:00:00 2001 From: Christian Posta Date: Mon, 30 Mar 2015 17:07:12 -0700 Subject: [PATCH] Fix for https://issues.apache.org/jira/browse/AMQ-5082 ActiveMQ replicatedLevelDB cluster breaks, all nodes stop listening. Many thanks to Jim Robinson (jim.robinson@gmail.com) for the patch! --- .../replicated/groups/ZooKeeperGroup.scala | 30 ++++-- .../test/ElectingLevelDBStoreTest.java | 61 +++++++++++ .../leveldb/test/ZooKeeperTestSupport.java | 6 +- .../server/TestServerCnxnFactory.java | 101 ++++++++++++++++++ 4 files changed, 188 insertions(+), 10 deletions(-) create mode 100644 activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala index 39399d1b46..99808ed0a2 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala @@ -72,7 +72,9 @@ object ZooKeeperGroup { */ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListener with ChangeListenerSupport { - val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1) + var tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1) + var rebuildTree = false + val joins = HashMap[String, Int]() var members = new LinkedHashMap[String, Array[Byte]] @@ -82,12 +84,13 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen zk.registerListener(this) create(root) - tree.track(new NodeEventsListener[Array[Byte]]() { + var treeEventHandler = new NodeEventsListener[Array[Byte]]() { def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = { if( !closed ) - fire_cluster_change + fire_cluster_change; } - }) + } + tree.track(treeEventHandler) fire_cluster_change @volatile @@ -110,7 +113,21 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen } def connected = zk.isConnected - def onConnected() = fireConnected() + def onConnected() = { + this.synchronized { + // underlying ZooKeeperTreeTracker isn't rebuilding itself after + // the loss of the session, so we need to destroy/rebuild it on + // reconnect. + if (rebuildTree) { + tree.destroy + tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1) + tree.track(treeEventHandler) + } else { + rebuildTree = true + } + } + fireConnected() + } def onDisconnected() = { this.members = new LinkedHashMap() fireDisconnected() @@ -187,5 +204,4 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen } } } - -} \ No newline at end of file +} diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java index 8dcaa8e76f..4d852de769 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java @@ -182,6 +182,67 @@ public class ElectingLevelDBStoreTest extends ZooKeeperTestSupport { }); } + /* + * testAMQ5082 tests the behavior of an ElectingLevelDBStore + * pool when ZooKeeper I/O timeouts occur. See issue AMQ-5082. + */ + @Test(timeout = 1000 * 60 * 5) + public void testAMQ5082() throws Throwable { + final ArrayList stores = new ArrayList(); + + LOG.info("Launching 3 stores"); + for (String dir : new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"}) { + ElectingLevelDBStore store = createStoreNode(); + store.setDirectory(new File(data_dir(), dir)); + stores.add(store); + asyncStart(store); + } + + LOG.info("Waiting 30s for stores to start"); + Thread.sleep(30 * 1000); + + LOG.info("Checking for a single master"); + ElectingLevelDBStore master = null; + for (ElectingLevelDBStore store: stores) { + if (store.isMaster()) { + assertNull(master); + master = store; + } + } + assertNotNull(master); + + LOG.info("Imposing 1s I/O wait on Zookeeper connections, waiting 30s to confirm that quorum is not lost"); + this.connector.testHandle.setIOWaitMillis(1 * 1000, 30 * 1000); + + LOG.info("Confirming that the quorum has not been lost"); + for (ElectingLevelDBStore store: stores) { + if (store.isMaster()) { + assertTrue(master == store); + } + } + + LOG.info("Imposing 11s I/O wait on Zookeeper connections, waiting 30s for quorum to be lost"); + this.connector.testHandle.setIOWaitMillis(11 * 1000, 30 * 1000); + + LOG.info("Confirming that the quorum has been lost"); + for (ElectingLevelDBStore store: stores) { + assertFalse(store.isMaster()); + } + master = null; + + LOG.info("Lifting I/O wait on Zookeeper connections, waiting 30s for quorum to be re-established"); + this.connector.testHandle.setIOWaitMillis(0, 30 * 1000); + + LOG.info("Checking for a single master"); + for (ElectingLevelDBStore store: stores) { + if (store.isMaster()) { + assertNull(master); + master = store; + } + } + assertNotNull(master); + } + @After public void stop() throws Exception { if (master != null) { diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java index 7498d98b1a..34422beeaa 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java @@ -19,7 +19,7 @@ package org.apache.activemq.leveldb.test; import org.apache.activemq.leveldb.CountDownFuture; import org.apache.activemq.leveldb.util.FileSupport; import org.apache.commons.io.FileUtils; -import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.TestServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.junit.After; @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit; */ public class ZooKeeperTestSupport { - protected NIOServerCnxnFactory connector; + protected TestServerCnxnFactory connector; static File data_dir() { return new File("target/activemq-data/leveldb-elections"); @@ -49,7 +49,7 @@ public class ZooKeeperTestSupport { ZooKeeperServer zk_server = new ZooKeeperServer(); zk_server.setTickTime(500); zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data"))); - connector = new NIOServerCnxnFactory(); + connector = new TestServerCnxnFactory(); connector.configure(new InetSocketAddress(0), 100); connector.startup(zk_server); System.out.println("ZooKeeper started"); diff --git a/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java b/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java new file mode 100644 index 0000000000..cc0adca742 --- /dev/null +++ b/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * TestServerCnxnFactory allows a caller to impose an artifical + * wait on I/O over the ServerCnxn used to communicate with the + * ZooKeeper server. + */ +public class TestServerCnxnFactory extends NIOServerCnxnFactory { + protected static final Logger LOG = LoggerFactory.getLogger(TestServerCnxnFactory.class); + + /* testHandle controls whehter or not an artifical wait + * is imposed when talking to the ZooKeeper server + */ + public TestHandle testHandle = new TestHandle(); + + public TestServerCnxnFactory() throws IOException { + super(); + } + + protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException { + return new TestServerCnxn(this.zkServer, sock, sk, this, testHandle); + } + + /* + * TestHandle is handed to TestServerCnxn and is used to + * control the amount of time the TestServerCnxn waits + * before allowing an I/O operation. + */ + public class TestHandle { + private Object mu = new Object(); + private int ioWaitMillis = 0; + + /* + * Set an artifical I/O wait (in milliseconds) on ServerCnxn and + * then sleep for the specified number of milliseconds. + */ + public void setIOWaitMillis(int ioWaitMillis, int sleepMillis) { + synchronized(mu) { + this.ioWaitMillis = ioWaitMillis; + } + if (sleepMillis > 0) { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) {} + } + } + + /* + * Get the number of milliseconds to wait before + * allowing ServerCnxn to perform I/O. + */ + public int getIOWaitMillis() { + synchronized(mu) { + return this.ioWaitMillis; + } + } + } + + public class TestServerCnxn extends NIOServerCnxn { + public TestHandle testHandle; + + public TestServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, TestHandle testHandle) throws IOException { + super(zk, sock, sk, factory); + this.testHandle = testHandle; + } + + public void doIO(SelectionKey k) throws InterruptedException { + final int millis = this.testHandle.getIOWaitMillis(); + if (millis > 0) { + LOG.info("imposing a "+millis+" millisecond wait on ServerCxn: "+this); + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + } + super.doIO(k); + } + } +}