From f206a1bd11dfe8265faacfdbbb74113894069d28 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 23 Jul 2010 15:31:10 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2580 - patch was good for AMQ store, fix needed for kahaDB and JDBC now done, test case was great, thanks. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@967134 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/amq/RecoveryListenerAdapter.java | 9 +- .../store/jdbc/JDBCTopicMessageStore.java | 9 +- .../jdbc/adapter/DefaultJDBCAdapter.java | 6 +- .../kahadaptor/KahaTopicReferenceStore.java | 2 - .../activemq/store/kahadb/KahaDBStore.java | 5 +- .../apache/activemq/JmsTopicSelectorTest.java | 8 - .../java/org/apache/activemq/TestSupport.java | 41 +- .../org/apache/activemq/bugs/AMQ2580Test.java | 202 ++++++ .../failover/FailoverTransactionTest.java | 626 ++++++++---------- 9 files changed, 536 insertions(+), 372 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java index 2699ae7528..4b5cfa24d8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java @@ -46,10 +46,11 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener { public boolean recoverMessage(Message message) throws Exception { if (listener.hasSpace()) { - listener.recoverMessage(message); - lastRecovered = message.getMessageId(); - count++; - return true; + if (listener.recoverMessage(message)) { + lastRecovered = message.getMessageId(); + count++; + return true; + } } return false; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index d51f1ff58c..f9d16258f5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -108,10 +108,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess if (listener.hasSpace()) { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); - listener.recoverMessage(msg); - finalLast.set(sequenceId); - finalPriority.set(msg.getPriority()); - return true; + if (listener.recoverMessage(msg)) { + finalLast.set(sequenceId); + finalPriority.set(msg.getPriority()); + return true; + } } return false; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index ae516a566b..f6cfeabca4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -451,7 +451,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } else { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); } - s.setMaxRows(maxReturned); + // no set max rows as selectors may need to scan more than maxReturned messages to get what they need s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); @@ -466,16 +466,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter { while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; - } else { - break; } } } else { while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; - } else { - break; } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 196a89b81d..8724affaf1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -305,8 +305,6 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic if (recoverReference(listener, msg)) { count++; container.setBatchEntry(msg.getMessageId(), entry); - } else { - break; } } else { container.reset(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index f09d622dcf..9fcaa6b544 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -798,8 +798,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator .hasNext();) { entry = iterator.next(); - listener.recoverMessage(loadMessage(entry.getValue().location)); - counter++; + if (listener.recoverMessage(loadMessage(entry.getValue().location))) { + counter++; + } if (counter >= maxReturned || listener.hasSpace() == false) { break; } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java index 7337a67171..727c9ede9b 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java @@ -47,14 +47,6 @@ public class JmsTopicSelectorTest extends TestSupport { protected boolean durable; protected int deliveryMode = DeliveryMode.PERSISTENT; - public JmsTopicSelectorTest() { - super(); - } - - public JmsTopicSelectorTest(String name) { - super(name); - } - public void setUp() throws Exception { super.setUp(); diff --git a/activemq-core/src/test/java/org/apache/activemq/TestSupport.java b/activemq-core/src/test/java/org/apache/activemq/TestSupport.java index 873e03fd83..48f266f10e 100755 --- a/activemq-core/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/TestSupport.java @@ -17,6 +17,8 @@ package org.apache.activemq; import java.io.File; +import java.io.IOException; +import java.util.Enumeration; import java.util.Map; import javax.jms.Connection; @@ -34,6 +36,10 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,18 +48,11 @@ import org.apache.commons.logging.LogFactory; * * @version $Revision: 1.5 $ */ -public class TestSupport extends TestCase { +public abstract class TestSupport extends CombinationTestSupport { protected ActiveMQConnectionFactory connectionFactory; protected boolean topic = true; - - public TestSupport() { - super(); - } - - public TestSupport(String name) { - super(name); - } + public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB; protected ActiveMQMessage createMessage() { return new ActiveMQMessage(); @@ -173,4 +172,28 @@ public class TestSupport extends TestCase { regionBroker.getQueueRegion().getDestinationMap() : regionBroker.getTopicRegion().getDestinationMap(); } + + public static enum PersistenceAdapterChoice {KahaDB, AMQ, JDBC }; + + public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { + return setPersistenceAdapter(broker, defaultPersistenceAdapter); + } + + public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException { + PersistenceAdapter adapter = null; + switch (choice) { + case AMQ: + adapter = new AMQPersistenceAdapter(); + break; + case JDBC: + adapter = new JDBCPersistenceAdapter(); + break; + case KahaDB: + adapter = new KahaDBPersistenceAdapter(); + break; + } + broker.setPersistenceAdapter(adapter); + return adapter; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java new file mode 100644 index 0000000000..4c626f9d32 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +public class AMQ2580Test extends TestSupport { + + private static final Log LOG = LogFactory.getLog(AMQ2580Test.class); + + private static final String TOPIC_NAME = "topicName"; + private static final String CLIENT_ID = "client_id"; + private static final String textOfSelectedMsg = "good_message"; + + protected TopicConnection connection; + + private Topic topic; + private Session session; + private MessageProducer producer; + private ConnectionFactory connectionFactory; + private TopicConnection topicConnection; + private BrokerService service; + + public static Test suite() { + return suite(AMQ2580Test.class); + } + + protected void setUp() throws Exception { + super.setUp(); + initDurableBroker(); + initConnectionFactory(); + initTopic(); + } + + protected void tearDown() throws Exception { + shutdownClient(); + service.stop(); + super.tearDown(); + } + + private void initConnection() throws JMSException { + if (connection == null) { + LOG.info("Initializing connection"); + + connection = (TopicConnection) connectionFactory.createConnection(); + connection.start(); + } + } + + public void initCombosForTestTopicIsDurableSmokeTest() throws Exception { + addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values()); + } + + public void testTopicIsDurableSmokeTest() throws Exception { + + initClient(); + MessageConsumer consumer = createMessageConsumer(); + LOG.info("Consuming message"); + assertNull(consumer.receive(1)); + shutdownClient(); + consumer.close(); + + sendMessages(); + shutdownClient(); + + initClient(); + consumer = createMessageConsumer(); + + LOG.info("Consuming message"); + TextMessage answer1 = (TextMessage) consumer.receive(1000); + assertNotNull("we got our message", answer1); + + consumer.close(); + } + + private MessageConsumer createMessageConsumer() throws JMSException { + LOG.info("creating durable subscriber"); + return session.createDurableSubscriber(topic, + TOPIC_NAME, + "name='value'", + false); + } + + private void initClient() throws JMSException { + LOG.info("Initializing client"); + + initConnection(); + initSession(); + } + + private void shutdownClient() + throws JMSException { + LOG.info("Closing session and connection"); + session.close(); + connection.close(); + session = null; + connection = null; + } + + private void sendMessages() + throws JMSException { + initConnection(); + + initSession(); + + LOG.info("Creating producer"); + producer = session.createProducer(topic); + + sendMessageThatFailsSelection(); + + sendMessage(textOfSelectedMsg, "value"); + } + + private void initSession() throws JMSException { + LOG.info("Initializing session"); + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private void sendMessageThatFailsSelection() throws JMSException { + for (int i = 0; i < 5; i++) { + String textOfNotSelectedMsg = "Msg_" + i; + sendMessage(textOfNotSelectedMsg, "not_value"); + LOG.info("#"); + } + } + + private void sendMessage( + String msgText, + String propertyValue) throws JMSException { + LOG.info("Creating message: " + msgText); + TextMessage messageToSelect = session.createTextMessage(msgText); + messageToSelect.setStringProperty("name", propertyValue); + LOG.info("Sending message"); + producer.send(messageToSelect); + } + + protected void initConnectionFactory() throws Exception { + ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory(); + connectionFactory = activeMqConnectionFactory; + } + + + private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception { + ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory( + "failover:" + service.getTransportConnectors().get(0).getConnectUri().toString()); + activeMqConnectionFactory.setWatchTopicAdvisories(false); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setDurableTopicPrefetch(2); + prefetchPolicy.setOptimizeDurableTopicPrefetch(2); + activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy); + activeMqConnectionFactory.setClientID(CLIENT_ID); + return activeMqConnectionFactory; + } + + private void initDurableBroker() throws Exception { + service = new BrokerService(); + setDefaultPersistenceAdapter(service); + service.setDeleteAllMessagesOnStartup(true); + service.setAdvisorySupport(false); + service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"}); + service.setPersistent(true); + service.setUseJmx(false); + service.start(); + + } + + private void initTopic() throws JMSException { + topicConnection = (TopicConnection) connectionFactory.createConnection(); + TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + topic = topicSession.createTopic(TOPIC_NAME); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index d18d01e7bf..c900e659b4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -16,20 +16,24 @@ */ package org.apache.activemq.transport.failover; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.SocketProxy; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import javax.jms.Connection; import javax.jms.JMSException; @@ -42,112 +46,94 @@ import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.ConsumerBrokerExchange; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.SocketProxy; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.junit.After; -import org.junit.Test; +import java.net.URI; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; // see https://issues.apache.org/activemq/browse/AMQ-2473 + // https://issues.apache.org/activemq/browse/AMQ-2590 -public class FailoverTransactionTest { - +public class FailoverTransactionTest extends TestSupport { + private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class); - private static final String QUEUE_NAME = "FailoverWithTx"; - private String url = "tcp://localhost:61616"; - BrokerService broker; - - public void startCleanBroker() throws Exception { - startBroker(true); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); + private static final String QUEUE_NAME = "FailoverWithTx"; + private String url = "tcp://localhost:61616"; + BrokerService broker; + + public static Test suite() { + return suite(FailoverTransactionTest.class); + } + + public void startCleanBroker() throws Exception { + startBroker(true); + } + + public void tearDown() throws Exception { + stopBroker(); + } + + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); broker.start(); - } + } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - broker.addConnector(url); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - return broker; - } + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.addConnector(url); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + return broker; + } - @Test - public void testFailoverProducerCloseBeforeTransaction() throws Exception { - startCleanBroker(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(QUEUE_NAME); + public void testFailoverProducerCloseBeforeTransaction() throws Exception { + startCleanBroker(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); - produceMessage(session, destination); - - // restart to force failover and connection state recovery before the commit - broker.stop(); - startBroker(false); + produceMessage(session, destination); - session.commit(); - assertNotNull("we got the message", consumer.receive(20000)); - session.commit(); - connection.close(); - } - - @Test - public void testFailoverCommitReplyLostAMQ() throws Exception { - doTestFailoverCommitReplyLost(0); - } - - @Test - public void testFailoverCommitReplyLostJdbc() throws Exception { - doTestFailoverCommitReplyLost(1); + // restart to force failover and connection state recovery before the commit + broker.stop(); + startBroker(false); + + session.commit(); + assertNotNull("we got the message", consumer.receive(20000)); + session.commit(); + connection.close(); } - - @Test - public void testFailoverCommitReplyLostKahaDB() throws Exception { - doTestFailoverCommitReplyLost(2); + + public void initCombosForTestFailoverCommitReplyLost() { + addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values()); } - - public void doTestFailoverCommitReplyLost(final int adapter) throws Exception { - + + public void testFailoverCommitReplyLost() throws Exception { + broker = createBroker(true); - setPersistenceAdapter(adapter); - - broker.setPlugins(new BrokerPlugin[] { + setDefaultPersistenceAdapter(broker); + + broker.setPlugins(new BrokerPlugin[]{ new BrokerPluginSupport() { @Override public void commitTransaction(ConnectionContext context, - TransactionId xid, boolean onePhase) throws Exception { + TransactionId xid, boolean onePhase) throws Exception { super.commitTransaction(context, xid, onePhase); // so commit will hang as if reply is lost context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping broker post commit..."); try { @@ -157,11 +143,11 @@ public class FailoverTransactionTest { } } }); - } + } } }); broker.start(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); Connection connection = cf.createConnection(); connection.start(); @@ -170,10 +156,10 @@ public class FailoverTransactionTest { MessageConsumer consumer = session.createConsumer(destination); produceMessage(session, destination); - + final CountDownLatch commitDoneLatch = new CountDownLatch(1); // broker will die on commit reply so this will hang till restart - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit..."); try { @@ -186,15 +172,15 @@ public class FailoverTransactionTest { LOG.info("done async commit"); } }); - + // will be stopped by the plugin broker.waitUntilStopped(); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); broker.start(); assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); - + // new transaction Message msg = consumer.receive(20000); LOG.info("Received: " + msg); @@ -203,16 +189,16 @@ public class FailoverTransactionTest { session.commit(); consumer.close(); connection.close(); - + // ensure no dangling messages with fresh broker etc broker.stop(); broker.waitUntilStopped(); - + LOG.info("Checking for remaining/hung messages.."); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); broker.start(); - + // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); connection = cf.createConnection(); @@ -228,37 +214,30 @@ public class FailoverTransactionTest { connection.close(); } - - //@Test not implemented - public void testFailoverSendReplyLostAMQ() throws Exception { - doTestFailoverSendReplyLost(0); - } - - @Test - public void testFailoverSendReplyLostJdbc() throws Exception { - doTestFailoverSendReplyLost(1); + + public void initCombosForTestFailoverSendReplyLost() { + addCombinationValues("defaultPersistenceAdapter", + new Object[]{PersistenceAdapterChoice.KahaDB, + PersistenceAdapterChoice.JDBC + // not implemented for AMQ store + }); } - - @Test - public void testFailoverSendReplyLostKahaDB() throws Exception { - doTestFailoverSendReplyLost(2); - } - - public void doTestFailoverSendReplyLost(final int adapter) throws Exception { - + + public void testFailoverSendReplyLost() throws Exception { + broker = createBroker(true); - setPersistenceAdapter(adapter); - - broker.setPlugins(new BrokerPlugin[] { + setDefaultPersistenceAdapter(broker); + + broker.setPlugins(new BrokerPlugin[]{ new BrokerPluginSupport() { @Override public void send(ProducerBrokerExchange producerExchange, - org.apache.activemq.command.Message messageSend) + org.apache.activemq.command.Message messageSend) throws Exception { // so send will hang as if reply is lost super.send(producerExchange, messageSend); producerExchange.getConnectionContext().setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping broker post send..."); try { @@ -272,7 +251,7 @@ public class FailoverTransactionTest { } }); broker.start(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false"); Connection connection = cf.createConnection(); connection.start(); @@ -282,7 +261,7 @@ public class FailoverTransactionTest { MessageConsumer consumer = session.createConsumer(destination); final CountDownLatch sendDoneLatch = new CountDownLatch(1); // broker will die on send reply so this will hang till restart - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async send..."); try { @@ -296,16 +275,16 @@ public class FailoverTransactionTest { LOG.info("done async send"); } }); - + // will be stopped by the plugin broker.waitUntilStopped(); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); LOG.info("restarting...."); broker.start(); assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); - + // new transaction Message msg = consumer.receive(20000); LOG.info("Received: " + msg); @@ -313,20 +292,20 @@ public class FailoverTransactionTest { assertNull("we got just one message", consumer.receive(2000)); consumer.close(); connection.close(); - + // verify stats - assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); - assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); - + assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); + // ensure no dangling messages with fresh broker etc broker.stop(); broker.waitUntilStopped(); - + LOG.info("Checking for remaining/hung messages with second restart.."); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); broker.start(); - + // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); connection = cf.createConnection(); @@ -342,43 +321,45 @@ public class FailoverTransactionTest { connection.close(); } - // not implemented.. @Test - public void testFailoverConnectionSendReplyLostAMQ() throws Exception { - doTestFailoverConnectionSendReplyLost(0); - } - - @Test - public void testFailoverConnectionSendReplyLostJdbc() throws Exception { - doTestFailoverConnectionSendReplyLost(1); + + public void initCombosForTestFailoverConnectionSendReplyLost() { + addCombinationValues("defaultPersistenceAdapter", + new Object[]{PersistenceAdapterChoice.KahaDB, + PersistenceAdapterChoice.JDBC + // last producer message id store feature not implemented for AMQ store + }); } - - @Test - public void testFailoverConnectionSendReplyLostKahaDB() throws Exception { - doTestFailoverConnectionSendReplyLost(2); - } - - public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception { - + + public void testFailoverConnectionSendReplyLost() throws Exception { + broker = createBroker(true); - setPersistenceAdapter(adapter); - + PersistenceAdapter store = setDefaultPersistenceAdapter(broker); + if (store instanceof KahaDBPersistenceAdapter) { + // duplicate checker not updated on canceled tasks, even it + // it was, recovery of the audit would fail as the message is + // not recorded in the store and the audit may not be up to date. + // So if duplicate messages are a absolute no no after restarts, + // ConcurrentStoreAndDispatchQueues must be disabled + ((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false); + } + final SocketProxy proxy = new SocketProxy(); - broker.setPlugins(new BrokerPlugin[] { + broker.setPlugins(new BrokerPlugin[]{ new BrokerPluginSupport() { private boolean firstSend = true; @Override public void send(ProducerBrokerExchange producerExchange, - org.apache.activemq.command.Message messageSend) + org.apache.activemq.command.Message messageSend) throws Exception { // so send will hang as if reply is lost super.send(producerExchange, messageSend); if (firstSend) { firstSend = false; - + producerExchange.getConnectionContext().setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping connection post send..."); try { @@ -386,17 +367,17 @@ public class FailoverTransactionTest { } catch (Exception e) { e.printStackTrace(); } - } + } }); } } } }); broker.start(); - + proxy.setTarget(new URI(url)); proxy.open(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false"); Connection connection = cf.createConnection(); connection.start(); @@ -406,7 +387,7 @@ public class FailoverTransactionTest { MessageConsumer consumer = session.createConsumer(destination); final CountDownLatch sendDoneLatch = new CountDownLatch(1); // proxy connection will die on send reply so this will hang on failover reconnect till open - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async send..."); try { @@ -419,33 +400,33 @@ public class FailoverTransactionTest { LOG.info("done async send"); } }); - + // will be closed by the plugin assertTrue("proxy was closed", proxy.waitUntilClosed(30)); LOG.info("restarting proxy"); proxy.open(); assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); - + Message msg = consumer.receive(20000); LOG.info("Received: " + msg); assertNotNull("we got the message", msg); assertNull("we got just one message", consumer.receive(2000)); consumer.close(); connection.close(); - + // verify stats, connection dup suppression means dups don't get to broker - assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); - + assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + // ensure no dangling messages with fresh broker etc broker.stop(); broker.waitUntilStopped(); - + LOG.info("Checking for remaining/hung messages with restart.."); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); broker.start(); - + // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); connection = cf.createConnection(); @@ -460,95 +441,68 @@ public class FailoverTransactionTest { assertNull("no messges left dangling but got: " + msg, msg); connection.close(); } - - - - private void setPersistenceAdapter(int adapter) throws IOException { - switch (adapter) { - case 0: - broker.setPersistenceAdapter(new AMQPersistenceAdapter()); - break; - case 1: - broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); - break; - case 2: - KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter(); - // duplicate checker not updated on canceled tasks, even it - // it was, reovery of the audit would fail as the message is - // not recorded in the store and the audit may not be up to date. - // So if duplicate are a nono (w.r.t stats), this must be disabled - store.setConcurrentStoreAndDispatchQueues(false); - store.setMaxFailoverProducersToTrack(10); - store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest")); - broker.setPersistenceAdapter(store); - break; - } + + public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { + startCleanBroker(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false"); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(destination); + produceMessage(session, destination); + + // restart to force failover and connection state recovery before the commit + broker.stop(); + startBroker(false); + + session.commit(); + + // without tracking producers, message will not be replayed on recovery + assertNull("we got the message", consumer.receive(5000)); + session.commit(); + connection.close(); } - @Test - public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { - startCleanBroker(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false"); - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(QUEUE_NAME); - - MessageConsumer consumer = session.createConsumer(destination); - produceMessage(session, destination); - - // restart to force failover and connection state recovery before the commit - broker.stop(); - startBroker(false); - - session.commit(); - - // without tracking producers, message will not be replayed on recovery - assertNull("we got the message", consumer.receive(5000)); - session.commit(); - connection.close(); - } - - @Test - public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception { - startCleanBroker(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(QUEUE_NAME); - - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer; - TextMessage message; - final int count = 10; - for (int i=0; i connections = new Vector(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); Connection connection = cf.createConnection(); @@ -642,41 +595,41 @@ public class FailoverTransactionTest { connections.add(connection); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); - + connection = cf.createConnection(); connection.start(); connections.add(connection); final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - + connection = cf.createConnection(); connection.start(); connections.add(connection); final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - + final MessageConsumer consumer1 = consumerSession1.createConsumer(destination); final MessageConsumer consumer2 = consumerSession2.createConsumer(destination); - + produceMessage(producerSession, destination); produceMessage(producerSession, destination); - + final Vector receivedMessages = new Vector(); final CountDownLatch commitDoneLatch = new CountDownLatch(1); final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit after consume..."); try { Message msg = consumer1.receive(20000); LOG.info("consumer1 first attempt got message: " + msg); receivedMessages.add(msg); - + // give some variance to the runs TimeUnit.SECONDS.sleep(pauseSeconds * 2); - + // should not get a second message as there are two messages and two consumers // and prefetch=1, but with failover and unordered connection restore it can get the second // message. - + // For the transaction to complete it needs to get the same one or two messages // again so that the acks line up. // If redelivery order is different, the commit should fail with an ex @@ -686,7 +639,7 @@ public class FailoverTransactionTest { if (msg != null) { receivedMessages.add(msg); } - + LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); try { consumerSession1.commit(); @@ -698,7 +651,7 @@ public class FailoverTransactionTest { } else { throw expectedSometimes; } - + } commitDoneLatch.countDown(); LOG.info("done async commit"); @@ -707,18 +660,18 @@ public class FailoverTransactionTest { } } }); - - + + // will be stopped by the plugin broker.waitUntilStopped(); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); broker.start(); assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); - + LOG.info("received message count: " + receivedMessages.size()); - + // new transaction Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000); LOG.info("post: from consumer1 received: " + msg); @@ -728,7 +681,7 @@ public class FailoverTransactionTest { assertNull("should be nothing left for consumer as recieve should have committed", msg); } consumerSession1.commit(); - + if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) { // just one message successfully consumed or none consumed @@ -738,20 +691,20 @@ public class FailoverTransactionTest { assertNotNull("got second message on consumer2", msg); consumerSession2.commit(); } - - for (Connection c: connections) { + + for (Connection c : connections) { c.close(); } - + // ensure no dangling messages with fresh broker etc broker.stop(); broker.waitUntilStopped(); - + LOG.info("Checking for remaining/hung messages.."); broker = createBroker(false); - setPersistenceAdapter(adapter); + setDefaultPersistenceAdapter(broker); broker.start(); - + // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); connection = cf.createConnection(); @@ -767,7 +720,6 @@ public class FailoverTransactionTest { connection.close(); } - @Test public void testAutoRollbackWithMissingRedeliveries() throws Exception { broker = createBroker(true); broker.start(); @@ -778,34 +730,33 @@ public class FailoverTransactionTest { final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = consumerSession.createConsumer(destination); - + produceMessage(producerSession, destination); - + Message msg = consumer.receive(20000); assertNotNull(msg); - + broker.stop(); broker = createBroker(false); // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover - setPersistenceAdapter(1); + setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC); broker.start(); - + try { consumerSession.commit(); fail("expected transaciton rolledback ex"); } catch (TransactionRolledBackException expected) { } - - broker.stop(); + + broker.stop(); broker = createBroker(false); broker.start(); - + assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000)); connection.close(); } - - @Test + public void testWaitForMissingRedeliveries() throws Exception { LOG.info("testWaitForMissingRedeliveries()"); broker = createBroker(true); @@ -817,23 +768,23 @@ public class FailoverTransactionTest { final Queue destination = producerSession.createQueue(QUEUE_NAME); final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = consumerSession.createConsumer(destination); - + produceMessage(producerSession, destination); Message msg = consumer.receive(20000); if (msg == null) { AutoFailTestSupport.dumpAllThreads("missing-"); } assertNotNull("got message just produced", msg); - + broker.stop(); broker = createBroker(false); // use empty jdbc store so that wait for re-deliveries occur when failover resumes - setPersistenceAdapter(1); + setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC); broker.start(); final CountDownLatch commitDone = new CountDownLatch(1); // will block pending re-deliveries - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit..."); try { @@ -843,19 +794,18 @@ public class FailoverTransactionTest { } } }); - - broker.stop(); + + broker.stop(); broker = createBroker(false); broker.start(); - - assertTrue("commit was successfull", commitDone.await(30, TimeUnit.SECONDS)); - + + assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS)); + assertNull("should not get committed message", consumer.receive(5000)); connection.close(); } - - @Test + public void testPoisonOnDeliveryWhilePending() throws Exception { LOG.info("testWaitForMissingRedeliveries()"); broker = createBroker(true); @@ -867,25 +817,25 @@ public class FailoverTransactionTest { final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0"); final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = consumerSession.createConsumer(destination); - + produceMessage(producerSession, destination); Message msg = consumer.receive(20000); if (msg == null) { AutoFailTestSupport.dumpAllThreads("missing-"); } assertNotNull("got message just produced", msg); - - broker.stop(); + + broker.stop(); broker = createBroker(false); broker.start(); final CountDownLatch commitDone = new CountDownLatch(1); - + // with prefetch=0, it will not get redelivered as there will not be another receive // for this consumer. so it will block till it timeout with an exception // will block pending re-deliveries - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit..."); try { @@ -895,30 +845,30 @@ public class FailoverTransactionTest { } } }); - + // pull the pending message to this consumer where it will be poison as it is a duplicate without a tx MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1")); assertNull("consumer2 not get a message while pending to 1", consumer2.receive(2000)); - + assertTrue("commit completed with ex", commitDone.await(15, TimeUnit.SECONDS)); assertNull("consumer should not get rolledback and non redelivered message", consumer.receive(5000)); - + // message should be in dlq MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ")); TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000); assertNotNull("found message in dlq", dlqMessage); assertEquals("text matches", "Test message", dlqMessage.getText()); consumerSession.commit(); - + connection.close(); } private void produceMessage(final Session producerSession, Queue destination) throws JMSException { - MessageProducer producer = producerSession.createProducer(destination); + MessageProducer producer = producerSession.createProducer(destination); TextMessage message = producerSession.createTextMessage("Test message"); producer.send(message); producer.close(); } - + }