https://issues.apache.org/jira/browse/AMQ-3305 - Implement "exactly once" delivery with kahaDB and XA in the event of a failure post prepare.

Fixed up transaction broker recovery processing and kahadb store such that pending recovered
messages and acks wait for and respect the eventual xa transction outcome. Essentially
implementing exactly once delivery semantics on failure. Updated the camel jms to jdbc
test route to validate correct failure recovery processing with geronimo, test pulls in
xa wrappers from jencks and openejb such that NamedXAResources are registered with geronimo.
Additional unit tests added.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1100208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-05-06 13:27:49 +00:00
parent ddca4cf590
commit 101e7110bc
21 changed files with 647 additions and 252 deletions

View File

@ -132,7 +132,20 @@
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<version>2.1</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.objectweb.howl</groupId>
<artifactId>howl</artifactId>
<version>1.0.1-1</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.openejb</groupId>
<artifactId>openejb-core</artifactId>
<version>3.1.2</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
@ -143,6 +156,18 @@
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-spring</artifactId>
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-ra</artifactId>
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jencks</groupId>
<artifactId>jencks-amqpool</artifactId>

View File

@ -19,7 +19,6 @@ package org.apache.activemq.camel;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -33,18 +32,19 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.Wait;
import org.apache.camel.spring.SpringTestSupport;
import org.apache.commons.dbcp.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.enhydra.jdbc.pool.StandardXAPoolDataSource;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* shows broker heuristic rollback (no prepare memory), hence duplicate message delivery
* shows broker 'once only delivery' and recovery with XA
*/
public class JmsJdbcXATest extends SpringTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsJdbcXATest.class);
BrokerService broker = null;
int messageCount;
public java.sql.Connection initDb() throws Exception {
String createStatement =
@ -55,9 +55,7 @@ public class JmsJdbcXATest extends SpringTestSupport {
"messageContent varchar(2048) NOT NULL, " +
"PRIMARY KEY (id) )";
java.sql.Connection conn = null;
StandardXAPoolDataSource pool = getMandatoryBean(StandardXAPoolDataSource.class, "jdbcEnhydraXaDataSource");
conn = pool.getConnection();
java.sql.Connection conn = getJDBCConnection();
try {
conn.createStatement().execute(createStatement);
} catch (SQLException alreadyExists) {
@ -73,6 +71,11 @@ public class JmsJdbcXATest extends SpringTestSupport {
return conn;
}
private java.sql.Connection getJDBCConnection() throws Exception {
BasicDataSource dataSource = getMandatoryBean(BasicDataSource.class, "managedDataSourceWithRecovery");
return dataSource.getConnection();
}
private int dumpDb(java.sql.Connection jdbcConn) throws Exception {
int count = 0;
ResultSet resultSet = jdbcConn.createStatement().executeQuery("SELECT * FROM SCP_INPUT_MESSAGES");
@ -86,10 +89,86 @@ public class JmsJdbcXATest extends SpringTestSupport {
return count;
}
public void testRecovery() throws Exception {
public void testRecoveryCommit() throws Exception {
java.sql.Connection jdbcConn = initDb();
broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[]{
sendJMSMessageToKickOffRoute();
LOG.info("waiting for route to kick in, it will kill the broker on first 2pc commit");
// will be stopped by the plugin on first 2pc commit
broker.waitUntilStopped();
assertEquals("message in db, commit to db worked", 1, dumpDb(jdbcConn));
LOG.info("Broker stopped, restarting...");
broker = createBroker(false);
broker.start();
broker.waitUntilStarted();
assertEquals("pending transactions", 1, broker.getBroker().getPreparedTransactions(null).length);
// TM stays actively committing first message ack which won't get redelivered - xa once only delivery
LOG.info("waiting for recovery to complete");
assertTrue("recovery complete in time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getBroker().getPreparedTransactions(null).length == 0;
}
}));
// verify recovery complete
assertEquals("recovery complete", 0, broker.getBroker().getPreparedTransactions(null).length);
final java.sql.Connection freshConnection = getJDBCConnection();
assertTrue("did not get replay", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == dumpDb(freshConnection);
}
}));
assertEquals("still one message in db", 1, dumpDb(freshConnection));
// let once complete ok
sendJMSMessageToKickOffRoute();
assertTrue("got second message", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 2 == dumpDb(freshConnection);
}
}));
assertEquals("two messages in db", 2, dumpDb(freshConnection));
}
private void sendJMSMessageToKickOffRoute() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
TextMessage message = session.createTextMessage("Some Text, messageCount:" + messageCount++);
message.setJMSCorrelationID("pleaseCorrelate");
producer.send(message);
connection.close();
}
private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setBrokerName("testXA");
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
brokerService.setDataDirectory("target/data");
brokerService.addConnector("tcp://0.0.0.0:61616");
return brokerService;
}
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
deleteDirectory("target/data/howl");
// make broker available to recovery processing on app context start
try {
broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context,
@ -113,54 +192,12 @@ public class JmsJdbcXATest extends SpringTestSupport {
}
}
}
});
broker.start();
});
broker.start();
} catch (Exception e) {
throw new RuntimeException("Failed to start broker", e);
}
final java.sql.Connection jdbcConn = initDb();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
TextMessage message = session.createTextMessage("Some Text");
message.setJMSCorrelationID("pleaseCorrelate");
producer.send(message);
LOG.info("waiting for route to kick in, it will kill the broker on first 2pc commit");
// will be stopped by the plugin on first 2pc commit
broker.waitUntilStopped();
assertEquals("message in db, commit to db worked", 1, dumpDb(jdbcConn));
LOG.info("Broker stopped, restarting...");
broker = createBroker(false);
broker.start();
broker.waitUntilStarted();
LOG.info("waiting for completion or route with replayed message");
assertTrue("got a second message in the db", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 2 == dumpDb(jdbcConn);
}
}));
assertEquals("message in db", 2, dumpDb(jdbcConn));
}
private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setBrokerName("testXA");
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
brokerService.setDataDirectory("target/data");
brokerService.addConnector("tcp://0.0.0.0:61616");
return brokerService;
}
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbc.xml");
}
}

View File

@ -14,107 +14,102 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<!-- START SNIPPET: jms_jdbc_xa -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
">
<!-- build broker in code so it can be restarted and modified to test recovery -->
<context:annotation-config />
<!-- broker creation in code so it can be restarted and modified to test recovery -->
<!-- some logging that can help
log4j.logger.org.apache.activemq.TransactionContext=TRACE
log4j.logger.org.springframework.transaction.support.AbstractPlatformTransactionManager=TRACE
log4j.logger.org.apache.geronimo.transaction.manager=TRACE
log4j.logger.org.enhydra.jdbc=TRACE
-->
<!-- use jencks factory beans to easily configure howl and geronimo transaction manager -->
<bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl"/>
<!-- Transaction log -->
<bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean">
<property name="logFileDir" value="target/data/howl/txlog"/>
<property name="xidFactory" ref="xidFactory"/>
</bean>
<bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
<property name="transactionLog" ref="transactionLog"/>
</bean>
<!-- XID factory -->
<bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl" />
<bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?jms.dispatchAsync=false"/>
</bean>
<!-- Transaction log -->
<bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean">
<property name="logFileDir" value="target/data/howl/txlog"/>
<property name="xidFactory" ref="xidFactory"/>
</bean>
<!-- register ActiveMQ with Geronimo to allow out of band transaction recovery/completion on a new connection
the resourceName gives the ActiveMQ XAResource an identity, Geronimo NamedXAResource in the transaction log
-->
<bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="connectionFactory" ref="activemqConnectionFactory"/>
<property name="resourceName" value="activemq.broker"/>
</bean>
<!-- Setup the geronimo transaction manager -->
<bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
<property name="transactionLog" ref="transactionLog"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean"
depends-on="jenckTransactionManager">
<property name="maxConnections" value="1"/>
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="connectionFactory" ref="activemqConnectionFactory"/>
<property name="resourceName" value="activemq.broker"/>
</bean>
<!-- Configure the Spring framework to use JTA transactions from Geronimo -->
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="jenckTransactionManager"/>
</bean>
<!-- Configure the Spring framework (used by camel) to use JTA transactions from Geronimo -->
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="jenckTransactionManager"/>
</bean>
<!-- Using the jencks ActiveMQ pool to enable XA -->
<bean id="jmsXaConnectionFactory" class="org.jencks.amqpool.XaPooledConnectionFactory">
<constructor-arg value="tcp://localhost:61616" />
<property name="maxConnections" value="8" />
<property name="transactionManager" ref="jenckTransactionManager" />
</bean>
<!-- Define the activemq Camel component so we can integrate with the AMQ broker -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"
depends-on="pooledConnectionFactory">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<!-- cache level is important, can be cache connection or none, as session needs to be enlisted
in the current transaction they can't be cached, with default cache sessions, they are created
up front, before the transaction (required for the route) -->
<property name="cacheLevel" value="0"/>
</bean>
<!-- Define the activemq Camel component so we can integrate with the AMQ broker -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
<property name="connectionFactory" ref="jmsXaConnectionFactory"/>
<!-- cache level is important, can be cache connection or none, as session needs to be enlisted
in the current transaction they can't be cached, with default cache sessions, they are created
up front, before the transaction (required for the route) -->
<property name="cacheLevel" value="0" />
</bean>
<!-- openejb provides geronimo NamedXAResources wrapper around commons dbcp such that they have an identity in the howl log -->
<bean id="geronimoXAResourceWrapper"
class="org.apache.openejb.resource.GeronimoTransactionManagerFactory.GeronimoXAResourceWrapper"/>
<bean id="managedDataSourceWithRecovery" class="org.apache.openejb.resource.jdbc.ManagedDataSourceWithRecovery">
<constructor-arg>
<ref bean="geronimoXAResourceWrapper"></ref>
</constructor-arg>
<property name="jdbcDriver" value="org.apache.derby.jdbc.EmbeddedDriver"/>
<property name="jdbcUrl" value="jdbc:derby:target/XatestDs;create=true"/>
<property name="transactionManager" ref="jenckTransactionManager"/>
</bean>
<!-- Setup the connection manager -->
<bean id="connectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
<property name="transactionManager" ref="jenckTransactionManager" />
<property name="transaction" value="xa" />
</bean>
<bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
</bean>
<!-- Setup the JDBC Managed Connection Factory (that supports XA) -->
<!--bean id="jdbcManagedConnectionFactory" class="org.jencks.tranql.XAPoolDataSourceMCF">
<property name="driverName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/ScpBuffer?relaxAutoCommit=true"/>
<property name="user" value="rails"/>
<property name="password" value="rails"/>
</bean -->
<bean id="jdbcEnhydraXaDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<!-- property name="driverName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost/ScpBuffer?relaxAutoCommit=true" / -->
<!-- try embedded derby xa -->
<property name="driverName" value="org.apache.derby.jdbc.EmbeddedDriver" />
<property name="url" value="jdbc:derby:target/XatestDs;create=true" />
<property name="transactionManager" ref="jenckTransactionManager" />
</bean>
</property>
<property name="transactionManager" ref="jenckTransactionManager" />
</bean>
<bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route id="queueToDbTransacted">
<from uri="activemq:queue:scp_transacted"/>
<transacted ref="required"/>
<convertBodyTo type="java.lang.String"/>
<to uri="log:BeforeSettingBody?showAll=true"/>
<setBody>
<simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent) VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}')</simple>
</setBody>
<to uri="jdbc:jdbcEnhydraXaDataSource"/>
</route>
</camelContext>
<!-- the route, from jms to jdbc in an xa transaction -->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route id="queueToDbTransacted">
<from uri="activemq:queue:scp_transacted"/>
<transacted ref="required"/>
<convertBodyTo type="java.lang.String"/>
<to uri="log:BeforeSettingBody?showAll=true"/>
<setBody>
<simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent)
VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}')
</simple>
</setBody>
<to uri="jdbc:managedDataSourceWithRecovery"/>
</route>
</camelContext>
</beans>
<!-- END SNIPPET: jms_jdbc_xa -->

View File

@ -437,10 +437,6 @@
<!-- these seem to fail only in m2 -->
<!--<exclude>**/TransactedTopicMasterSlaveTest.*</exclude>-->
<!-- Kaha in flux - removing tests -->
<exclude>**/KahaDBStoreXARecoveryBrokerTest.*</exclude>
<exclude>**/KahaDBStoreRecoveryBrokerTest.*</exclude>
<!-- Multicast and UDP based tests fail on GBuild -->
<exclude>**/PeerTransportTest.*</exclude>
<exclude>**/MulticastTransportTest.*</exclude>

View File

@ -135,17 +135,21 @@ public class TransactionContext implements XAResource {
return;
}
Throwable firstException = null;
int size = synchronizations.size();
try {
for (int i = 0; i < size; i++) {
for (int i = 0; i < size; i++) {
try {
synchronizations.get(i).afterRollback();
} catch (Throwable t) {
LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
if (firstException == null) {
firstException = t;
}
}
} catch (JMSException e) {
throw e;
} catch (Throwable e) {
throw JMSExceptionSupport.create(e);
} finally {
synchronizations = null;
}
synchronizations = null;
if (firstException != null) {
throw JMSExceptionSupport.create(firstException);
}
}
@ -154,17 +158,21 @@ public class TransactionContext implements XAResource {
return;
}
Throwable firstException = null;
int size = synchronizations.size();
try {
for (int i = 0; i < size; i++) {
for (int i = 0; i < size; i++) {
try {
synchronizations.get(i).afterCommit();
} catch (Throwable t) {
LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
if (firstException == null) {
firstException = t;
}
}
} catch (JMSException e) {
throw e;
} catch (Throwable e) {
throw JMSExceptionSupport.create(e);
} finally {
synchronizations = null;
}
synchronizations = null;
if (firstException != null) {
throw JMSExceptionSupport.create(firstException);
}
}
@ -528,7 +536,11 @@ public class TransactionContext implements XAResource {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
ctx.afterCommit();
try {
ctx.afterCommit();
} catch (Exception ignored) {
LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
}
}
}

View File

@ -17,13 +17,21 @@
package org.apache.activemq.broker;
import java.util.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
@ -90,13 +98,15 @@ public class TransactionBroker extends BrokerFilter {
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
try {
beginTransaction(context, xid);
Transaction transaction = getTransaction(context, xid, false);
for (int i = 0; i < addedMessages.length; i++) {
send(producerExchange, addedMessages[i]);
kickDestinationOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
}
for (int i = 0; i < aks.length; i++) {
acknowledge(consumerExchange, aks[i]);
kickDestinationOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
}
prepareTransaction(context, xid);
transaction.setState(Transaction.PREPARED_STATE);
LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
} catch (Throwable e) {
throw new WrappedException(e);
}
@ -109,6 +119,64 @@ public class TransactionBroker extends BrokerFilter {
next.start();
}
private void kickDestinationOnCompletion(ConnectionContext context, Transaction transaction,
ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
Destination destination = addDestination(context, amqDestination, false);
registerSync(destination, transaction, ack);
}
private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
if (destination instanceof Queue) {
Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
// ensure one per destination in the list
transaction.removeSynchronization(sync);
transaction.addSynchronization(sync);
}
}
static class PreparedDestinationCompletion extends Synchronization {
final Queue queue;
final boolean messageSend;
public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
this.queue = queue;
// rollback relevant to acks, commit to sends
this.messageSend = messageSend;
}
@Override
public int hashCode() {
return System.identityHashCode(queue) +
System.identityHashCode(Boolean.valueOf(messageSend));
}
@Override
public boolean equals(Object other) {
return other instanceof PreparedDestinationCompletion &&
queue.equals(((PreparedDestinationCompletion) other).queue) &&
messageSend == ((PreparedDestinationCompletion) other).messageSend;
}
@Override
public void afterRollback() throws Exception {
if (!messageSend) {
queue.clearPendingMessages();
if (LOG.isDebugEnabled()) {
LOG.debug("cleared pending from afterRollback : " + queue);
}
}
}
@Override
public void afterCommit() throws Exception {
if (messageSend) {
queue.clearPendingMessages();
if (LOG.isDebugEnabled()) {
LOG.debug("cleared pending from afterCommit : " + queue);
}
}
}
}
public void stop() throws Exception {
transactionStore.stop();
next.stop();
@ -135,7 +203,7 @@ public class TransactionBroker extends BrokerFilter {
XATransactionId rc[] = new XATransactionId[txs.size()];
txs.toArray(rc);
if (LOG.isDebugEnabled()) {
LOG.debug("prepared transacton list size: " + rc.length);
LOG.debug("prepared transaction list size: " + rc.length);
}
return rc;
}
@ -253,7 +321,7 @@ public class TransactionBroker extends BrokerFilter {
// first find all txs that belongs to the connection
ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
for (XATransaction tx : xaTransactions.values()) {
if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
txs.add(tx);
}
}

View File

@ -1132,6 +1132,19 @@ public class Queue extends BaseDestination implements Task, UsageListener {
getMessages().clear();
}
public void clearPendingMessages() {
messagesLock.writeLock().lock();
try {
if (store != null) {
store.resetBatching();
}
messages.gc();
asyncWakeup();
} finally {
messagesLock.writeLock().unlock();
}
}
/**
* Removes the message matching the given messageId
*/

View File

@ -56,13 +56,16 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
clear();
super.start();
resetBatch();
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
resetSize();
setCacheEnabled(!this.storeHasMessages&&useCache);
}
}
protected void resetSize() {
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
}
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();
@ -237,6 +240,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
batchList.clear();
clearIterator(false);
batchResetNeeded = true;
resetSize();
setCacheEnabled(false);
}
@ -287,8 +291,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return size;
}
@Override
public String toString() {
return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+ ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled();
}

View File

@ -97,9 +97,4 @@ class QueueStorePrefetch extends AbstractStoreCursor {
this.store.recoverNextMessages(this.maxBatchSize, this);
}
@Override
public String toString() {
return "QueueStorePrefetch" + System.identityHashCode(this);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
import java.util.ArrayList;
import java.util.Arrays;
import javax.transaction.xa.Xid;
import org.apache.activemq.util.HexSupport;
@ -34,6 +35,7 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
private transient int hash;
private transient String transactionKey;
private transient ArrayList<MessageAck> preparedAcks;
public XATransactionId() {
}
@ -50,8 +52,17 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
public synchronized String getTransactionKey() {
if (transactionKey == null) {
transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBytes(globalTransactionId) + ":"
+ HexSupport.toHexFromBytes(branchQualifier);
StringBuffer s = new StringBuffer();
s.append("XID:[globalId=");
for (int i = 0; i < globalTransactionId.length; i++) {
s.append(Integer.toHexString(globalTransactionId[i]));
}
s.append(",branchId=");
for (int i = 0; i < branchQualifier.length; i++) {
s.append(Integer.toHexString(branchQualifier[i]));
}
s.append("]");
transactionKey = s.toString();
}
return transactionKey;
}
@ -141,4 +152,11 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
return getTransactionKey().compareTo(xid.getTransactionKey());
}
public void setPreparedAcks(ArrayList<MessageAck> preparedAcks) {
this.preparedAcks = preparedAcks;
}
public ArrayList<MessageAck> getPreparedAcks() {
return preparedAcks;
}
}

View File

@ -461,6 +461,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
}
@ -483,6 +486,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
listener.hasSpace() && iterator.hasNext(); ) {
entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
counter++;

View File

@ -26,7 +26,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -268,6 +267,7 @@ public class KahaDBTransactionStore implements TransactionStore {
// ensure message order w.r.t to cursor and store for setBatch()
synchronized (this) {
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
forgetRecoveredAcks(txid);
}
}
}else {
@ -283,11 +283,19 @@ public class KahaDBTransactionStore implements TransactionStore {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
forgetRecoveredAcks(txid);
} else {
inflightTransactions.remove(txid);
}
}
protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
if (txid.isXATransaction()) {
XATransactionId xaTid = ((XATransactionId) txid);
theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
}
}
public void start() throws Exception {
}
@ -295,8 +303,6 @@ public class KahaDBTransactionStore implements TransactionStore {
}
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
// inflightTransactions.clear();
for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
@ -320,6 +326,8 @@ public class KahaDBTransactionStore implements TransactionStore {
MessageAck[] acks = new MessageAck[ackList.size()];
messageList.toArray(addedMessages);
ackList.toArray(acks);
xid.setPreparedAcks(ackList);
theStore.trackRecoveredAcks(ackList);
listener.recover(xid, addedMessages, acks);
}
}

View File

@ -36,8 +36,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
@ -1719,7 +1721,35 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// /////////////////////////////////////////////////////////////////
protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final Set<String> ackedAndPrepared = new HashSet<String>();
// messages that have prepared (pending) acks cannot be redispatched unless the outcome is rollback,
// till then they are skipped by the store.
// 'at most once' XA guarantee
public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.add(ack.getLastMessageId().toString());
}
} finally {
this.indexLock.writeLock().unlock();
}
}
public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
if (acks != null) {
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.remove(ack.getLastMessageId().toString());
}
} finally {
this.indexLock.writeLock().unlock();
}
}
}
private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
TransactionId key = key(info);
List<Operation> tx;

View File

@ -52,7 +52,7 @@ public class XATransaction extends Transaction {
@Override
public void commit(boolean onePhase) throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("XA Transaction commit: " + xid);
LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid);
}
switch (getState()) {

View File

@ -19,13 +19,13 @@ package org.apache.activemq.broker;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.IOHelper;
public class BrokerRestartTestSupport extends BrokerTestSupport {
private PersistenceAdapter persistenceAdapter;
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
@ -33,9 +33,8 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
if (dir != null) {
IOHelper.deleteChildren(dir);
}
//broker.setPersistent(false);
broker.setDeleteAllMessagesOnStartup(true);
persistenceAdapter = broker.getPersistenceAdapter();
configureBroker(broker);
return broker;
}
@ -45,10 +44,13 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
*/
protected BrokerService createRestartedBroker() throws Exception {
BrokerService broker = new BrokerService();
//broker.setPersistenceAdapter(persistenceAdapter);
configureBroker(broker);
return broker;
}
protected void configureBroker(BrokerService broker) {
}
/**
* Simulates a broker restart. The memory based persistence adapter is
* reused so that it does not "loose" it's "persistent" messages.

View File

@ -543,13 +543,15 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < NUMBER; i++) {
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNotNull(m);
MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
MessageAck ack = createAck(consumerInfo, m, NUMBER, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Don't commit
// restart the broker.
@ -566,7 +568,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
// All messages should be re-delivered.
for (int i = 0; i < NUMBER; i++) {
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNotNull(m);
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.broker;
import junit.framework.Test;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
@ -31,6 +33,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.v5.MessageMarshaller;
/**
* Used to simulate the recovery that occurs when a broker shuts down.
@ -70,13 +73,14 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
restartBroker();
// Setup the consumer and receive the message.
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
@ -86,6 +90,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(consumerInfo);
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
@ -98,7 +103,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));
}
// We should not get the committed transactions.
// We should get the committed transactions.
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
@ -180,13 +185,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNotNull(m);
MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Commit
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
@ -205,7 +213,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// No messages should be delivered.
assertNoMessagesLeft(connection);
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNull(m);
}
@ -235,35 +243,133 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNotNull(m);
MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createPrepareTransaction(connectionInfo, txid));
// restart the broker.
restartBroker();
// Setup the consumer and receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
// validate recovery
TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
sessionInfo = createSessionInfo(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// All messages should be re-delivered.
// no redelivery, exactly once semantics unless there is rollback
m = receiveMessage(connection);
assertNull(m);
assertNoMessagesLeft(connection);
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
// validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = null;
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createPrepareTransaction(connectionInfo, txid));
// restart the broker.
restartBroker();
connection = createConnection();
connectionInfo = createConnectionInfo();
connection.send(connectionInfo);
// validate recovery
TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
sessionInfo = createSessionInfo(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// no redelivery, exactly once semantics while prepared
message = receiveMessage(connection);
assertNull(message);
assertNoMessagesLeft(connection);
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
// Begin new transaction for redelivery
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Commit
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
// validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
@ -292,13 +398,14 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNotNull(m);
MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Don't commit
// restart the broker.
@ -315,7 +422,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// All messages should be re-delivered.
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
m = receiveMessage(connection);
assertNotNull(m);
}

View File

@ -33,6 +33,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -82,7 +85,7 @@ public class SocketProxy {
}
public void open() throws Exception {
serverSocket = new ServerSocket();
serverSocket = createServerSocket(target);
serverSocket.setReuseAddress(true);
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
@ -101,6 +104,24 @@ public class SocketProxy {
closed = new CountDownLatch(1);
}
private boolean isSsl(URI target) {
return "ssl".equals(target.getScheme());
}
private ServerSocket createServerSocket(URI target) throws Exception {
if (isSsl(target)) {
return SSLServerSocketFactory.getDefault().createServerSocket();
}
return new ServerSocket();
}
private Socket createSocket(URI target) throws Exception {
if (isSsl(target)) {
return SSLSocketFactory.getDefault().createSocket();
}
return new Socket();
}
public URI getUrl() {
return proxyUrl;
}
@ -226,7 +247,7 @@ public class SocketProxy {
public Bridge(Socket socket, URI target) throws Exception {
receiveSocket = socket;
sendSocket = new Socket();
sendSocket = createSocket(target);
if (receiveBufferSize > 0) {
sendSocket.setReceiveBufferSize(receiveBufferSize);
}

View File

@ -21,8 +21,11 @@ import java.io.IOException;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -116,16 +119,51 @@ public class ActiveMQResourceManager {
rm.getResourceName() != null && !"".equals(rm.getResourceName());
}
public static boolean recover(ActiveMQResourceManager rm) throws IOException {
public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
if (isRecoverable(rm)) {
try {
ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
rtxManager.recoverResourceManager(namedXaResource);
rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
@Override
public String getName() {
return rm.getResourceName();
}
@Override
public NamedXAResource getNamedXAResource() throws SystemException {
try {
final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
activeConn.start();
LOGGER.debug("new namedXAResource's connection: " + activeConn);
return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
} catch (Exception e) {
SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
se.initCause(e);
LOGGER.error(se.getLocalizedMessage(), se);
throw se;
}
}
@Override
public void returnNamedXAResource(NamedXAResource namedXaResource) {
if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
try {
LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
} catch (Exception ignored) {
LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
}
}
}
});
return true;
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
@ -136,4 +174,11 @@ public class ActiveMQResourceManager {
}
}
public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
final ActiveMQConnection connection;
public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
super(xaResource, name);
this.connection = connection;
}
}
}

View File

@ -90,42 +90,52 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
if (!ignoreClose) {
// TODO a cleaner way to reset??
// lets reset the session
getInternalSession().setMessageListener(null);
boolean invalidate = false;
try {
// lets reset the session
getInternalSession().setMessageListener(null);
// Close any consumers and browsers that may have been created.
for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = iter.next();
consumer.close();
// Close any consumers and browsers that may have been created.
for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = iter.next();
consumer.close();
}
for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = iter.next();
browser.close();
}
if (transactional && !isXa) {
try {
getInternalSession().rollback();
} catch (JMSException e) {
invalidate = true;
LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
}
}
} catch (JMSException ex) {
invalidate = true;
LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
} finally {
consumers.clear();
browsers.clear();
}
consumers.clear();
for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = iter.next();
browser.close();
}
browsers.clear();
if (transactional && !isXa) {
try {
getInternalSession().rollback();
} catch (JMSException e) {
LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
// lets close the session and not put the session back into
// the pool
if (invalidate) {
// lets close the session and not put the session back into
// the pool
if (session != null) {
try {
session.close();
} catch (JMSException e1) {
LOG.trace("Ignoring exception as discarding session: " + e1, e1);
LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
}
session = null;
sessionPool.invalidateSession(this);
return;
}
sessionPool.invalidateSession(this);
} else {
sessionPool.returnSession(this);
}
sessionPool.returnSession(this);
}
}

View File

@ -899,7 +899,7 @@
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<version>2.1</version>
<version>2.2.1</version>
</dependency>
<!-- FTP support for BlobMessages -->