From 8e83b7f1238e622ae0a8adaa0144897e67d56335 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 8 Oct 2010 12:08:32 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2950 - additional fix to support parallel transactions git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1005794 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransactionBroker.java | 13 +- .../activemq/transaction/XATransaction.java | 9 +- activemq-spring/pom.xml | 23 +++ .../spring/ParallelXATransactionTest.java | 156 ++++++++++++++++++ .../src/test/resources/spring/xa.xml | 90 ++++++++++ 5 files changed, 284 insertions(+), 7 deletions(-) create mode 100644 activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java create mode 100644 activemq-spring/src/test/resources/spring/xa.xml diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java index e61d0ba264..26b7f54408 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -58,7 +58,7 @@ public class TransactionBroker extends BrokerFilter { // The prepared XA transactions. private TransactionStore transactionStore; - private Map xaTransactions = new LinkedHashMap(); + private Map xaTransactions = new LinkedHashMap(); private ActiveMQMessageAudit audit; public TransactionBroker(Broker next, TransactionStore transactionStore) { @@ -125,7 +125,7 @@ public class TransactionBroker extends BrokerFilter { public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { List txs = new ArrayList(); synchronized (xaTransactions) { - for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) { + for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) { Transaction tx = iter.next(); if (tx.isPrepared()) { if (LOG.isDebugEnabled()) { @@ -146,13 +146,13 @@ public class TransactionBroker extends BrokerFilter { public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { // the transaction may have already been started. if (xid.isXATransaction()) { - Transaction transaction = null; + XATransaction transaction = null; synchronized (xaTransactions) { transaction = xaTransactions.get(xid); if (transaction != null) { return; } - transaction = new XATransaction(transactionStore, (XATransactionId)xid, this); + transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId()); xaTransactions.put(xid, transaction); } } else { @@ -252,9 +252,10 @@ public class TransactionBroker extends BrokerFilter { iter.remove(); } - for (Transaction tx : xaTransactions.values()) { + + for (XATransaction tx : xaTransactions.values()) { try { - if (!tx.isPrepared()) { + if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) { tx.rollback(); } } catch (Exception e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java b/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java index d4bad7dae9..9fc3966240 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java +++ b/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java @@ -20,6 +20,7 @@ import java.io.IOException; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.TransactionStore; @@ -36,11 +37,13 @@ public class XATransaction extends Transaction { private final TransactionStore transactionStore; private final XATransactionId xid; private final TransactionBroker broker; + private final ConnectionId connectionId; - public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker) { + public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) { this.transactionStore = transactionStore; this.xid = xid; this.broker = broker; + this.connectionId = connectionId; if (LOG.isDebugEnabled()) { LOG.debug("XA Transaction new/begin : " + xid); } @@ -199,6 +202,10 @@ public class XATransaction extends Transaction { broker.removeTransaction(xid); } + public ConnectionId getConnectionId() { + return connectionId; + } + @Override public TransactionId getTransactionId() { return xid; diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index f25367671d..e5b0648da5 100755 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -114,6 +114,29 @@ log4j test + + org.jencks + jencks + 2.2 + test + + + org.slf4j + slf4j-api + 1.4.3 + test + + + org.slf4j + slf4j-log4j12 + 1.4.3 + test + + + ${project.groupId} + activemq-ra + test + org.springframework.osgi spring-osgi-core diff --git a/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java b/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java new file mode 100644 index 0000000000..4bde46cb2c --- /dev/null +++ b/activemq-spring/src/test/java/org/apache/activemq/spring/ParallelXATransactionTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.spring; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.transaction.TransactionConfiguration; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.Resource; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.util.Arrays; + +@RunWith(SpringJUnit4ClassRunner.class) + +@ContextConfiguration(locations = {"classpath:spring/xa.xml"}) +@TransactionConfiguration(transactionManager = "transactionManager", defaultRollback = false) +public class ParallelXATransactionTest { + + private static final Log LOG = LogFactory.getLog(ParallelXATransactionTest.class); + + @Resource(name = "transactionManager") + PlatformTransactionManager txManager = null; + + @Resource(name = "transactionManager2") + PlatformTransactionManager txManager2 = null; + + + @Resource(name = "jmsTemplate") + JmsTemplate jmsTemplate = null; + + @Resource(name = "jmsTemplate2") + JmsTemplate jmsTemplate2 = null; + + + public static final int NB_MSG = 100; + public static final String BODY = Arrays.toString(new int[1024]); + private static final String[] QUEUES = {"TEST.queue1", "TEST.queue2", "TEST.queue3", "TEST.queue4", "TEST.queue5"}; + private static final String AUDIT = "TEST.audit"; + public static final int SLEEP = 500; + + @Test + @DirtiesContext + public void testParalellXaTx() throws Exception { + + + class ProducerThread extends Thread { + + PlatformTransactionManager txManager; + JmsTemplate jmsTemplate; + Exception lastException; + + + public ProducerThread(JmsTemplate jmsTemplate, PlatformTransactionManager txManager) { + this.jmsTemplate = jmsTemplate; + this.txManager = txManager; + } + + public void run() { + int i = 0; + while (i++ < 10) { + + try { + Thread.sleep((long) (Math.random() * SLEEP)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + TransactionTemplate tt = new TransactionTemplate(this.txManager); + + + try { + tt.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + + for (final String queue : QUEUES) { + jmsTemplate.send(queue + "," + AUDIT, new MessageCreator() { + public Message createMessage(Session session) throws JMSException { + return session.createTextMessage("P1: " + queue + " - " + BODY); + } + }); + Thread.sleep((long) (Math.random() * SLEEP)); + LOG.info("P1: Send msg to " + queue + "," + AUDIT); + } + + } catch (Exception e) { + Assert.fail("Exception occurred " + e); + } + + + } + }); + } catch (TransactionException e) { + lastException = e; + break; + } + + } + } + + public Exception getLastException() { + return lastException; + } + } + + + ProducerThread t1 = new ProducerThread(jmsTemplate, txManager); + ProducerThread t2 = new ProducerThread(jmsTemplate2, txManager2); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + if (t1.getLastException() != null) { + Assert.fail("Exception occurred " + t1.getLastException()); + } + + if (t2.getLastException() != null) { + Assert.fail("Exception occurred " + t2.getLastException()); + } + + } + +} diff --git a/activemq-spring/src/test/resources/spring/xa.xml b/activemq-spring/src/test/resources/spring/xa.xml new file mode 100644 index 0000000000..691fce05b9 --- /dev/null +++ b/activemq-spring/src/test/resources/spring/xa.xml @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file