diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 575e313129..fe6f59b90f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -878,6 +878,10 @@ public class BrokerService implements Service { } } + public boolean isStopped() { + return stopped.get(); + } + /** * A helper method to block the caller thread until the broker has fully started * @return boolean true if wait succeeded false if broker was not started or was stopped diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 722d9327bc..cfcce780e0 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -35,6 +35,7 @@ import util.TimeMetric import scala.Some import org.apache.activemq.ActiveMQMessageAuditNoSync import org.fusesource.hawtdispatch +import org.apache.activemq.broker.SuppressReplyException case class EntryLocator(qid:Long, seq:Long) case class DataLocator(store:LevelDBStore, pos:Long, len:Int) { @@ -569,9 +570,6 @@ class DBManager(val parent:LevelDBStore) { def drainFlushes:Unit = { dispatchQueue.assertExecuting() - if( !started ) { - return - } // Some UOWs may have been canceled. import collection.JavaConversions._ @@ -590,7 +588,12 @@ class DBManager(val parent:LevelDBStore) { assert(action!=null) } } - Some(uow) + if( !started ) { + uow.onCompleted(new SuppressReplyException("Store stopped")) + None + } else { + Some(uow) + } } } diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java index a8e743fa26..891098120f 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java @@ -22,6 +22,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import javax.jms.*; @@ -30,10 +31,17 @@ import javax.management.openmbean.CompositeData; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Enumeration; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.io.FileUtils; import static org.junit.Assert.*; @@ -82,6 +90,165 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { } } + public interface Client{ + public void execute(Connection connection) throws Exception; + } + + protected Thread startFailoverClient(String name, final Client client) throws IOException, URISyntaxException { + String url = "failover://(tcp://localhost:"+port+")?maxReconnectDelay=500&nested.wireFormat.maxInactivityDuration=1000"; + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + Thread rc = new Thread(name) { + @Override + public void run() { + Connection connection = null; + try { + connection = factory.createConnection(); + client.execute(connection); + } catch (Throwable e) { + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (JMSException e) { + } + } + } + }; + rc.start(); + return rc; + } + + @Test + @Ignore + public void testReplicationQuorumLoss() throws Throwable { + + System.out.println("======================================"); + System.out.println(" Start 2 ActiveMQ nodes."); + System.out.println("======================================"); + startBrokerAsync(createBrokerNode("node-1", port)); + startBrokerAsync(createBrokerNode("node-2", port)); + BrokerService master = waitForNextMaster(); + System.out.println("======================================"); + System.out.println(" Start the producer and consumer"); + System.out.println("======================================"); + + final AtomicBoolean stopClients = new AtomicBoolean(false); + final ArrayBlockingQueue errors = new ArrayBlockingQueue(100); + final AtomicLong receivedCounter = new AtomicLong(); + final AtomicLong sentCounter = new AtomicLong(); + Thread producer = startFailoverClient("producer", new Client() { + @Override + public void execute(Connection connection) throws Exception { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("test")); + long actual = 0; + while(!stopClients.get()) { + TextMessage msg = session.createTextMessage("Hello World"); + msg.setLongProperty("id", actual++); + producer.send(msg); + sentCounter.incrementAndGet(); + } + } + }); + + Thread consumer = startFailoverClient("consumer", new Client() { + @Override + public void execute(Connection connection) throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + long expected = 0; + while(!stopClients.get()) { + Message msg = consumer.receive(200); + if( msg!=null ) { + long actual = msg.getLongProperty("id"); + if( actual != expected ) { + errors.offer("Received got unexpected msg id: "+actual+", expected: "+expected); + } + msg.acknowledge(); + expected = actual+1; + receivedCounter.incrementAndGet(); + } + } + } + }); + + try { + assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS); + assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS); + assertNull(errors.poll()); + + System.out.println("======================================"); + System.out.println(" Master should stop once the quorum is lost."); + System.out.println("======================================"); + ArrayList stopped = stopSlaves();// stopping the slaves should kill the quorum. + assertStopsWithin(master, 10, TimeUnit.SECONDS); + assertNull(errors.poll()); // clients should not see an error since they are failover clients. + stopped.add(master); + + System.out.println("======================================"); + System.out.println(" Restart the slave. Clients should make progress again.."); + System.out.println("======================================"); + startBrokersAsync(createBrokerNodes(stopped)); + assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS); + assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS); + assertNull(errors.poll()); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + // Wait for the clients to stop.. + stopClients.set(true); + producer.join(); + consumer.join(); + } + } + + protected void startBrokersAsync(ArrayList brokers) { + for (BrokerService broker : brokers) { + startBrokerAsync(broker); + } + } + + protected ArrayList createBrokerNodes(ArrayList brokers) throws Exception { + ArrayList rc = new ArrayList(); + for (BrokerService b : brokers) { + rc.add(createBrokerNode(b.getBrokerName(), connectPort(b))); + } + return rc; + } + + protected ArrayList stopSlaves() throws Exception { + ArrayList rc = new ArrayList(); + for (BrokerService broker : brokers) { + if( broker.isSlave() ) { + System.out.println("Stopping slave: "+broker.getBrokerName()); + broker.stop(); + broker.waitUntilStopped(); + rc.add(broker); + } + } + brokers.removeAll(rc); + return rc; + } + + protected void assertStopsWithin(final BrokerService master, int timeout, TimeUnit unit) throws InterruptedException { + within(timeout, unit, new Task(){ + @Override + public void run() throws Exception { + assertTrue(master.isStopped()); + } + }); + } + + protected void assertCounterMakesProgress(final AtomicLong counter, int timeout, TimeUnit unit) throws InterruptedException { + final long initial = counter.get(); + within(timeout, unit, new Task(){ + public void run() throws Exception { + assertTrue(initial < counter.get()); + } + }); + } public void testAMQ4837(boolean jmx) throws Throwable { @@ -205,8 +372,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { private ArrayList browseMessagesViaJMS(BrokerService brokerService) throws Exception { ArrayList rc = new ArrayList(); - TransportConnector connector = brokerService.getTransportConnectors().get(0); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri()); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:"+ connectPort(brokerService)); Connection connection = factory.createConnection(); try { connection.start(); @@ -223,6 +389,19 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { return rc; } + private int connectPort(BrokerService brokerService) throws IOException, URISyntaxException { + TransportConnector connector = brokerService.getTransportConnectors().get(0); + return connector.getConnectUri().getPort(); + } + + int port; + @Before + public void findFreePort() throws Exception { + ServerSocket socket = new ServerSocket(0); + port = socket.getLocalPort(); + socket.close(); + } + @After public void stopBrokers() throws Exception { for (BrokerService broker : brokers) { @@ -235,12 +414,18 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { } private BrokerService createBrokerNode(String id) throws Exception { + return createBrokerNode(id, 0); + } + + private BrokerService createBrokerNode(String id, int port) throws Exception { BrokerService bs = new BrokerService(); bs.getManagementContext().setCreateConnector(false); brokers.add(bs); bs.setBrokerName(id); bs.setPersistenceAdapter(createStoreNode(id)); - bs.addConnector("tcp://0.0.0.0:0"); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI("tcp://0.0.0.0:" + port)); + bs.addConnector(connector); return bs; }