From 50a98e3b0e078a5f8be03ef2e75ae0745a40e719 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 15 Dec 2008 18:48:51 +0000 Subject: [PATCH] fix - AMQ-2034 - have close in XA transaction deferred to synchronisation after completion, have rollback call beforeEnd to propagate acknowledgements; add a bunch of tests git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@726764 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 34 +++- .../apache/activemq/ActiveMQXASession.java | 19 +++ .../apache/activemq/TransactionContext.java | 1 + .../ActiveMQXAConnectionFactoryTest.java | 156 ++++++++++++++++++ .../org/apache/activemq/JMSConsumerTest.java | 30 ++++ .../activemq/JmsQueueTransactionTest.java | 3 +- .../CloseRollbackRedeliveryQueueTest.java | 41 ++++- .../java/org/apache/activemq/ra/MDBTest.java | 90 ++++++++++ 8 files changed, 364 insertions(+), 10 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 9bd59a559d..db825c2588 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -590,11 +590,27 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC */ public void close() throws JMSException { if (!unconsumedMessages.isClosed()) { - dispose(); - this.session.asyncSendPacket(info.createRemoveCommand()); + if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) { + session.getTransactionContext().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + doClose(); + } + + public void afterRollback() throws Exception { + doClose(); + } + }); + } else { + doClose(); + } } } + void doClose() throws JMSException { + dispose(); + this.session.asyncSendPacket(info.createRemoveCommand()); + } + void clearMessagesInProgress() { // we are called from inside the transport reconnection logic // which involves us clearing all the connections' consumers @@ -653,10 +669,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // } // Do we have any acks we need to send out before closing? - // Ack any delivered messages now. (session may still - // commit/rollback the acks). + // Ack any delivered messages now. // only processes optimized acknowledgements - deliverAcks(); + if (!session.isTransacted()) { + deliverAcks(); + if (session.isDupsOkAcknowledge()) { + acknowledge(); + } + } if (executorService != null) { executorService.shutdown(); try { @@ -665,9 +685,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC Thread.currentThread().interrupt(); } } - if (session.isTransacted() || session.isDupsOkAcknowledge()) { - acknowledge(); - } + if (session.isClientAcknowledge()) { if (!this.info.isBrowser()) { // rollback duplicates that aren't acknowledged diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java index c4ba17dbde..ed8e541e6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java @@ -27,6 +27,7 @@ import javax.jms.XATopicSession; import javax.transaction.xa.XAResource; import org.apache.activemq.command.SessionId; +import org.apache.activemq.transaction.Synchronization; /** * The XASession interface extends the capability of Session by adding access @@ -96,6 +97,24 @@ public class ActiveMQXASession extends ActiveMQSession implements QueueSession, return new ActiveMQTopicSession(this); } + @Override + public void close() throws JMSException { + if (getTransactionContext().isInXATransaction()) { + getTransactionContext().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + doClose(); + } + + public void afterRollback() throws Exception { + doClose(); + } + }); + } + } + + void doClose() throws JMSException { + super.close(); + } /** * This is called before transacted work is done by * the session. XA Work can only be done when this diff --git a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java index 158be0e8df..c007f493ed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java @@ -224,6 +224,7 @@ public class TransactionContext implements XAResource { throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); } + beforeEnd(); if (transactionId != null) { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); this.transactionId = null; diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java index 1195708494..e762467d90 100644 --- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java @@ -16,27 +16,39 @@ */ package org.apache.activemq; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import javax.jms.XAConnection; import javax.jms.XAQueueConnection; import javax.jms.XASession; import javax.jms.XATopicConnection; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.transport.stomp.StompTransportFilter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(ActiveMQXAConnectionFactoryTest.class); + long txGenerator = System.currentTimeMillis(); public void testCopy() throws URISyntaxException, JMSException { ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?"); @@ -117,6 +129,126 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { connection2.close(); } + public void testVanilaTransactionalProduceReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + } + + public void testConsumerCloseTransactionalSendReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + producer.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + consumer.close(); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + session = connection1.createXASession(); + consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + assertNull(consumer.receive(1000)); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + } + + public void testSessionCloseTransactionalSendReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + session.close(); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + session = connection1.createXASession(); + consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + assertNull(consumer.receive(1000)); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + } + + protected void assertCreateConnection(String uri) throws Exception { // Start up a broker with a tcp connector. BrokerService broker = new BrokerService(); @@ -161,5 +293,29 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection); assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection); } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index ccafcc8bd4..6939ffa247 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -652,5 +652,35 @@ public class JMSConsumerTest extends JmsTestSupport { assertNull(redispatchConsumer.receive(500)); redispatchSession.close(); } + + public void testRedispatchOfRolledbackTx() throws Exception { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 1); + + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer.receive(1000)); + + // install another consumer while message dispatch is unacked/uncommitted + Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); + + session.rollback(); + session.close(); + + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much! + assertEquals(3, msg.getLongProperty("JMSXDeliveryCount")); + redispatchSession.commit(); + + assertNull(redispatchConsumer.receive(500)); + redispatchSession.close(); + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java index d523008585..2fbc06d030 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java @@ -157,7 +157,8 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport { // Get the first. assertEquals(outbound[0], consumer.receive(1000)); consumer.close(); - + session.commit(); + QueueBrowser browser = session.createBrowser((Queue)destination); Enumeration enumeration = browser.getEnumeration(); diff --git a/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java index f7099deef6..2d6e812573 100644 --- a/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java @@ -37,7 +37,7 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport protected int numberOfMessagesOnQueue = 1; private Connection connection; - public void testVerifyCloseRedeliveryWithFailoverTransport() throws Throwable { + public void testVerifySessionCloseRedeliveryWithFailoverTransport() throws Throwable { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = session.createConsumer(destination); @@ -57,7 +57,46 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport assertEquals("redelivered message", id, message.getJMSMessageID()); assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); } + + public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + Message message = consumer.receive(1000); + String id = message.getJMSMessageID(); + assertNotNull(message); + LOG.info("got message " + message); + consumer.close(); + session.close(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(destination); + + message = consumer.receive(1000); + session.commit(); + assertNotNull(message); + assertEquals("redelivered message", id, message.getJMSMessageID()); + assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + } + + public void testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws Throwable { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + + Message message = consumer.receive(1000); + String id = message.getJMSMessageID(); + assertNotNull(message); + LOG.info("got message " + message); + consumer.close(); + session.rollback(); + + consumer = session.createConsumer(destination); + message = consumer.receive(1000); + session.commit(); + assertNotNull(message); + assertEquals("redelivered message", id, message.getJMSMessageID()); + assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + } + protected void setUp() throws Exception { super.setUp(); diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java index 8c4c19109a..9a357bd471 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java @@ -25,7 +25,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -188,6 +190,94 @@ public class MDBTest extends TestCase { } + public void testMessageExceptionReDelivery() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("vm://localhost?broker.persistent=false"); + adapter.start(new StubBootstrapContext()); + + final CountDownLatch messageDelivered = new CountDownLatch(2); + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() { + public void onMessage(Message message) { + super.onMessage(message); + try { + messageDelivered.countDown(); + if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) { + throw new RuntimeException(getName() + " ex on first delivery"); + } else { + try { + assertTrue(message.getJMSRedelivered()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } catch (InterruptedException ignored) { + } + }; + + public void afterDelivery() throws ResourceException { + try { + if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) { + xaresource.end(xid, XAResource.TMFAIL); + xaresource.rollback(xid); + } else { + xaresource.end(xid, XAResource.TMSUCCESS); + xaresource.prepare(xid); + xaresource.commit(xid, false); + } + } catch (Throwable e) { + throw new ResourceException(e); + } + } + }; + + ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); + activationSpec.setDestinationType(Queue.class.getName()); + activationSpec.setDestination("TEST"); + activationSpec.setResourceAdapter(adapter); + activationSpec.validate(); + + MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(messageEndpointFactory, activationSpec); + + // Give endpoint a chance to setup and register its listeners + try { + Thread.sleep(1000); + } catch (Exception e) { + + } + + // Send the broker a message to that endpoint + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + producer.send(session.createTextMessage("Hello!")); + connection.close(); + + // Wait for the message to be delivered twice. + assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS)); + + // Shut the Endpoint down. + adapter.endpointDeactivation(messageEndpointFactory, activationSpec); + adapter.stop(); + + } + + public Xid createXid() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(baos);