diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index f152c8f308..b3ec513805 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -506,7 +506,7 @@ public class BrokerService implements Service { } public void stop() throws Exception { - if (!started.compareAndSet(true, false)) { + if (!started.get()) { return; } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 85b7162b94..1badeded5a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.command.Command; @@ -139,11 +140,28 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { + Vector toIgnore = new Vector(); for (TransactionState transactionState : connectionState.getTransactionStates()) { if (LOG.isDebugEnabled()) { LOG.debug("tx: " + transactionState.getId()); } + // ignore any empty (ack) transaction + if (transactionState.getCommands().size() == 2) { + Command lastCommand = transactionState.getCommands().get(1); + if (lastCommand instanceof TransactionInfo) { + TransactionInfo transactionInfo = (TransactionInfo) lastCommand; + if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { + if (LOG.isDebugEnabled()) { + LOG.debug("not replaying empty (ack) tx: " + transactionState.getId()); + } + toIgnore.add(lastCommand); + continue; + } + } + } + + // replay short lived producers that may have been involved in the transaction for (ProducerState producerState : transactionState.getProducerStates().values()) { if (LOG.isDebugEnabled()) { LOG.debug("tx replay producer :" + producerState.getInfo()); @@ -165,6 +183,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { transport.oneway(producerState.getInfo().createRemoveCommand()); } } + + for (Command command: toIgnore) { + // respond to the outstanding commit + Response response = new Response(); + response.setCorrelationId(command.getCommandId()); + transport.getTransportListener().onCommand(response); + } } /** @@ -200,6 +225,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { // Restore the session's consumers for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) { ConsumerState consumerState = (ConsumerState)iter3.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("restore consumer: " + consumerState.getInfo().getConsumerId()); + } transport.oneway(consumerState.getInfo()); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java new file mode 100644 index 0000000000..24cd3622e5 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.ActiveMQMessageTransformation; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.util.Wait; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Test; + +public class FailoverConsumerOutstandingCommitTest { + + private static final Log LOG = LogFactory.getLog(FailoverConsumerOutstandingCommitTest.class); + private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; + private String url = "tcp://localhost:61616"; + final int prefetch = 10; + BrokerService broker; + + public void startCleanBroker() throws Exception { + startBroker(true); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); + broker.start(); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = new BrokerService(); + broker.addConnector(url); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + + // optimizedDispatche and sync dispatch ensure that the dispatch happens + // before the commit reply that the consumer.clearDispatchList is waiting for. + defaultEntry.setOptimizedDispatch(true); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + + return broker; + } + + @Test + public void testFailoverConsumerDups() throws Exception { + doTestFailoverConsumerDups(true); + } + + public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { + + broker = createBroker(true); + + broker.setPlugins(new BrokerPlugin[] { + new BrokerPluginSupport() { + @Override + public void commitTransaction(ConnectionContext context, + TransactionId xid, boolean onePhase) throws Exception { + // so commit will hang as if reply is lost + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker before commit..."); + try { + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + }); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + cf.setWatchTopicAdvisories(watchTopicAdvisories); + cf.setDispatchAsync(false); + + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); + + final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + + final CountDownLatch commitDoneLatch = new CountDownLatch(1); + final MessageConsumer testConsumer = consumerSession.createConsumer(destination); + testConsumer.setMessageListener(new MessageListener() { + + public void onMessage(Message message) { + LOG.info("consume one and commit"); + + assertNotNull("got message", message); + try { + consumerSession.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + commitDoneLatch.countDown(); + LOG.info("done commit"); + } + }); + + produceMessage(producerSession, destination, prefetch * 2); + + // will be stopped by the plugin + broker.waitUntilStopped(); + broker = createBroker(false); + broker.start(); + + assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); + + connection.close(); + } + + private void produceMessage(final Session producerSession, Queue destination, long count) + throws JMSException { + MessageProducer producer = producerSession.createProducer(destination); + for (int i=0; i