mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2950 - additional fix to support parallel transactions
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1005794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8e70e010a0
commit
8e83b7f123
|
@ -58,7 +58,7 @@ public class TransactionBroker extends BrokerFilter {
|
|||
|
||||
// The prepared XA transactions.
|
||||
private TransactionStore transactionStore;
|
||||
private Map<TransactionId, Transaction> xaTransactions = new LinkedHashMap<TransactionId, Transaction>();
|
||||
private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
|
||||
private ActiveMQMessageAudit audit;
|
||||
|
||||
public TransactionBroker(Broker next, TransactionStore transactionStore) {
|
||||
|
@ -125,7 +125,7 @@ public class TransactionBroker extends BrokerFilter {
|
|||
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
|
||||
List<TransactionId> txs = new ArrayList<TransactionId>();
|
||||
synchronized (xaTransactions) {
|
||||
for (Iterator<Transaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
|
||||
for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
|
||||
Transaction tx = iter.next();
|
||||
if (tx.isPrepared()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -146,13 +146,13 @@ public class TransactionBroker extends BrokerFilter {
|
|||
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
// the transaction may have already been started.
|
||||
if (xid.isXATransaction()) {
|
||||
Transaction transaction = null;
|
||||
XATransaction transaction = null;
|
||||
synchronized (xaTransactions) {
|
||||
transaction = xaTransactions.get(xid);
|
||||
if (transaction != null) {
|
||||
return;
|
||||
}
|
||||
transaction = new XATransaction(transactionStore, (XATransactionId)xid, this);
|
||||
transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
|
||||
xaTransactions.put(xid, transaction);
|
||||
}
|
||||
} else {
|
||||
|
@ -252,9 +252,10 @@ public class TransactionBroker extends BrokerFilter {
|
|||
iter.remove();
|
||||
}
|
||||
|
||||
for (Transaction tx : xaTransactions.values()) {
|
||||
|
||||
for (XATransaction tx : xaTransactions.values()) {
|
||||
try {
|
||||
if (!tx.isPrepared()) {
|
||||
if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
|
||||
tx.rollback();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import org.apache.activemq.broker.TransactionBroker;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.store.TransactionStore;
|
||||
|
@ -36,11 +37,13 @@ public class XATransaction extends Transaction {
|
|||
private final TransactionStore transactionStore;
|
||||
private final XATransactionId xid;
|
||||
private final TransactionBroker broker;
|
||||
private final ConnectionId connectionId;
|
||||
|
||||
public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker) {
|
||||
public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) {
|
||||
this.transactionStore = transactionStore;
|
||||
this.xid = xid;
|
||||
this.broker = broker;
|
||||
this.connectionId = connectionId;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("XA Transaction new/begin : " + xid);
|
||||
}
|
||||
|
@ -199,6 +202,10 @@ public class XATransaction extends Transaction {
|
|||
broker.removeTransaction(xid);
|
||||
}
|
||||
|
||||
public ConnectionId getConnectionId() {
|
||||
return connectionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionId getTransactionId() {
|
||||
return xid;
|
||||
|
|
|
@ -114,6 +114,29 @@
|
|||
<artifactId>log4j</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jencks</groupId>
|
||||
<artifactId>jencks</artifactId>
|
||||
<version>2.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.4.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.4.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>activemq-ra</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.osgi</groupId>
|
||||
<artifactId>spring-osgi-core</artifactId>
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.spring;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.jms.core.MessageCreator;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.test.context.transaction.TransactionConfiguration;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionException;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Session;
|
||||
import java.util.Arrays;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
|
||||
@ContextConfiguration(locations = {"classpath:spring/xa.xml"})
|
||||
@TransactionConfiguration(transactionManager = "transactionManager", defaultRollback = false)
|
||||
public class ParallelXATransactionTest {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ParallelXATransactionTest.class);
|
||||
|
||||
@Resource(name = "transactionManager")
|
||||
PlatformTransactionManager txManager = null;
|
||||
|
||||
@Resource(name = "transactionManager2")
|
||||
PlatformTransactionManager txManager2 = null;
|
||||
|
||||
|
||||
@Resource(name = "jmsTemplate")
|
||||
JmsTemplate jmsTemplate = null;
|
||||
|
||||
@Resource(name = "jmsTemplate2")
|
||||
JmsTemplate jmsTemplate2 = null;
|
||||
|
||||
|
||||
public static final int NB_MSG = 100;
|
||||
public static final String BODY = Arrays.toString(new int[1024]);
|
||||
private static final String[] QUEUES = {"TEST.queue1", "TEST.queue2", "TEST.queue3", "TEST.queue4", "TEST.queue5"};
|
||||
private static final String AUDIT = "TEST.audit";
|
||||
public static final int SLEEP = 500;
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void testParalellXaTx() throws Exception {
|
||||
|
||||
|
||||
class ProducerThread extends Thread {
|
||||
|
||||
PlatformTransactionManager txManager;
|
||||
JmsTemplate jmsTemplate;
|
||||
Exception lastException;
|
||||
|
||||
|
||||
public ProducerThread(JmsTemplate jmsTemplate, PlatformTransactionManager txManager) {
|
||||
this.jmsTemplate = jmsTemplate;
|
||||
this.txManager = txManager;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
int i = 0;
|
||||
while (i++ < 10) {
|
||||
|
||||
try {
|
||||
Thread.sleep((long) (Math.random() * SLEEP));
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
TransactionTemplate tt = new TransactionTemplate(this.txManager);
|
||||
|
||||
|
||||
try {
|
||||
tt.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
try {
|
||||
|
||||
for (final String queue : QUEUES) {
|
||||
jmsTemplate.send(queue + "," + AUDIT, new MessageCreator() {
|
||||
public Message createMessage(Session session) throws JMSException {
|
||||
return session.createTextMessage("P1: " + queue + " - " + BODY);
|
||||
}
|
||||
});
|
||||
Thread.sleep((long) (Math.random() * SLEEP));
|
||||
LOG.info("P1: Send msg to " + queue + "," + AUDIT);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Exception occurred " + e);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
});
|
||||
} catch (TransactionException e) {
|
||||
lastException = e;
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Exception getLastException() {
|
||||
return lastException;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ProducerThread t1 = new ProducerThread(jmsTemplate, txManager);
|
||||
ProducerThread t2 = new ProducerThread(jmsTemplate2, txManager2);
|
||||
|
||||
t1.start();
|
||||
t2.start();
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
if (t1.getLastException() != null) {
|
||||
Assert.fail("Exception occurred " + t1.getLastException());
|
||||
}
|
||||
|
||||
if (t2.getLastException() != null) {
|
||||
Assert.fail("Exception occurred " + t2.getLastException());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:amq="http://activemq.apache.org/schema/core"
|
||||
xsi:schemaLocation="
|
||||
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
|
||||
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
<!-- broker -->
|
||||
|
||||
<amq:broker brokerName="test" useJmx="false" persistent="false">
|
||||
<amq:transportConnectors>
|
||||
<amq:transportConnector name="transport" uri="nio://0.0.0.0:61616"/>
|
||||
</amq:transportConnectors>
|
||||
</amq:broker>
|
||||
|
||||
<!-- simple tx -->
|
||||
|
||||
<bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
|
||||
<property name="connectionFactory" ref="connectionFactory2"/>
|
||||
</bean>
|
||||
|
||||
|
||||
<bean id="transactionManager2" class="org.springframework.jms.connection.JmsTransactionManager">
|
||||
<property name="connectionFactory" ref="connectionFactory2"/>
|
||||
</bean>
|
||||
|
||||
|
||||
<bean id="connectionFactory2" class="org.apache.activemq.ActiveMQConnectionFactory">
|
||||
<property name="brokerURL" value="tcp://localhost:61616"/>
|
||||
<property name="userName" value="smx"/>
|
||||
<property name="password" value="smx"/>
|
||||
</bean>
|
||||
|
||||
<!-- xa tx -->
|
||||
|
||||
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
|
||||
<property name="connectionFactory" ref="connectionFactory"/>
|
||||
</bean>
|
||||
|
||||
<bean id="transactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
|
||||
<property name="defaultTransactionTimeoutSeconds" value="300"/>
|
||||
</bean>
|
||||
|
||||
<bean id="connectionFactory" class="org.jencks.factory.ConnectionFactoryFactoryBean">
|
||||
<property name="connectionManager" ref="jmsConnectionManager"/>
|
||||
<property name="managedConnectionFactory" ref="jmsManagedConnectionFactory"/>
|
||||
</bean>
|
||||
|
||||
<bean id="jmsConnectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
|
||||
<property name="transaction" value="xa"/>
|
||||
<property name="transactionManager" ref="transactionManager"/>
|
||||
<property name="poolMaxSize" value="20"/>
|
||||
<property name="connectionTracker">
|
||||
<bean class="org.jencks.factory.ConnectionTrackerFactoryBean">
|
||||
<property name="geronimoTransactionManager" ref="transactionManager"/>
|
||||
</bean>
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
<bean id="jmsManagedConnectionFactory" class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
|
||||
<property name="resourceAdapter">
|
||||
<bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
|
||||
<property name="serverUrl" value="tcp://localhost:61616"/>
|
||||
<property name="maximumRedeliveries" value="6"/>
|
||||
<property name="allPrefetchValues" value="1"/>
|
||||
</bean>
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
</beans>
|
Loading…
Reference in New Issue