diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 22df07aea2..6a5c599023 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -322,12 +322,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us @Override protected boolean canDispatch(MessageReference node) { - if (!ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId())) { - return false; // prepared ack - } return true; // let them go, our dispatchPending gates the active / inactive state. } + @Override + protected boolean trackedInPendingTransaction(MessageReference node) { + return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId()); + } + @Override protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException { this.setTimeOfLastMessageAck(System.currentTimeMillis()); @@ -349,6 +351,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us @Override public void afterCommit() throws Exception { synchronized (pendingLock) { + // may be in the cursor post activate/load from the store + pending.remove(node); ackedAndPrepared.remove(node.getMessageId()); } } @@ -357,7 +361,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public void afterRollback() throws Exception { synchronized (pendingLock) { ackedAndPrepared.remove(node.getMessageId()); - pending.addMessageFirst(node); } dispatchPending(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index a3b9a4d980..f479923880 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -641,6 +641,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node == null) { break; } + if (trackedInPendingTransaction(node)) { + node.decrementReferenceCount(); + continue; + } // Synchronize between dispatched list and remove of message from pending list // related to remove subscription action @@ -685,6 +689,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } + protected boolean trackedInPendingTransaction(MessageReference node) { + return false; + } + protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { pending.setMaxBatchSize(numberToDispatch); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java index d84ff279b3..44f325bbec 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java @@ -217,6 +217,11 @@ public class BrokerTestSupport extends CombinationTestSupport { return info; } + protected TransactionInfo createEndTransaction(ConnectionInfo connectionInfo, TransactionId txid) { + TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.END); + return info; + } + protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) { TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE); return info; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 89745a9b64..8415b93c6e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -1372,6 +1372,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { ack.setTransactionId(txid); connection.send(ack); + connection.request(createEndTransaction(connectionInfo, txid)); connection.request(createPrepareTransaction(connectionInfo, txid)); // reconnect, verify perpared acks unavailable @@ -1455,6 +1456,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { ack.setTransactionId(txid); connection.send(ack); + connection.request(createEndTransaction(connectionInfo, txid)); connection.request(createPrepareTransaction(connectionInfo, txid)); // reconnect, verify perpared acks unavailable @@ -1479,8 +1481,101 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { // commit original tx connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + // verify still unavailable message = receiveMessage(connection, 2000); - assertNull("unexpected non null", message); + assertNull("unexpected non null: " + message, message); + + // unsubscribe + connection.request(consumerInfo.createRemoveCommand()); + RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo(); + removeSubscriptionInfo.setClientId(connectionInfo.getClientId()); + removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName()); + connection.request(removeSubscriptionInfo); + } + + public void initCombosForTestNoDupOnRollbackRedelivery() { + addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testNoDupOnRollbackRedelivery() throws Exception { + + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + int numMessages = 1; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = null; + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createEndTransaction(connectionInfo, txid)); + connection.request(createRollbackTransaction(connectionInfo, txid)); + + connection.send(consumerInfo.createRemoveCommand()); + connection.send(sessionInfo.createRemoveCommand()); + connection.send(connectionInfo.createRemoveCommand()); + + + LOG.info("new connection/consumer for redelivery"); + + connection.request(closeConnectionInfo(connectionInfo)); + + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + + // setup durable subs + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + message = receiveMessage(connection); + assertNotNull(message); + + Message dup = receiveMessage(connection); + assertNull("no duplicate send: " + dup, dup); + + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createEndTransaction(connectionInfo, txid)); + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); // unsubscribe connection.request(consumerInfo.createRemoveCommand()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java index da17f9228d..7adb983a72 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java @@ -72,4 +72,6 @@ public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest { public void testQueuePersistentPreparedAcksAvailableAfterRollback() throws Exception { // pending acks are not tracked in leveldb } + public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception { + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java new file mode 100644 index 0000000000..2acf962f8b --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.*; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class AMQ7185Test +{ + private final String xaDestinationName = "DestinationXA"; + private BrokerService broker; + private String connectionUri; + private long txGenerator = System.currentTimeMillis(); + + private XAConnectionFactory xaConnectionFactory; + private ConnectionFactory connectionFactory; + + final Topic dest = new ActiveMQTopic(xaDestinationName); + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setAdvisorySupport(false); + broker.addConnector("tcp://0.0.0.0:0?trace=true"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + ((ActiveMQConnectionFactory) connectionFactory).setWatchTopicAdvisories(false); + // failover ensure audit is in play + xaConnectionFactory = new ActiveMQXAConnectionFactory("failover://" + connectionUri); + ((ActiveMQXAConnectionFactory) xaConnectionFactory).setWatchTopicAdvisories(false); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testRollbackRedeliveryNoDup() throws Exception { + + XAConnection xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.setClientID("cid0"); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + TopicSubscriber consumer = session.createDurableSubscriber(dest, "sub"); + consumer.close(); + session.close(); + xaConnection.close(); + + publish(dest); + + Xid tid; + TextMessage receivedMessage; + xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.setClientID("cid0"); + xaConnection.start(); + session = xaConnection.createXASession(); + consumer = session.createDurableSubscriber(dest, "sub"); + + tid = createXid(); + XAResource resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + + receivedMessage = (TextMessage) consumer.receive(4000); + assertNotNull(receivedMessage); + resource.end(tid, XAResource.TMSUCCESS); + resource.rollback(tid); + consumer.close(); + session.close(); + xaConnection.close(); + + + // redelivery + xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.setClientID("cid0"); + xaConnection.start(); + session = xaConnection.createXASession(); + consumer = session.createDurableSubscriber(dest, "sub"); + + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + + // verify only one + receivedMessage = (TextMessage) consumer.receiveNoWait(); + assertNull(receivedMessage); + + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + consumer.close(); + session.close(); + xaConnection.close(); + + // assertNoMessageInDLQ + assertEquals("Only one enqueue", 1, broker.getAdminView().getTotalEnqueueCount()); + } + + private void publish(Topic dest) throws JMSException { + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createProducer(dest).send(new ActiveMQTextMessage()); + connection.close(); + } + + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } +}