add a sanity external tm xa rollback test

This commit is contained in:
gtully 2014-04-01 22:44:54 +01:00
parent 1999ddfd01
commit 10394734f6
2 changed files with 305 additions and 0 deletions

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.transaction.TransactionManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.transaction.jta.JtaTransactionManager;
/**
* shows rollback and redelivery dlq respected with external tm
*/
public class JmsJdbcXARollbackTest extends CamelSpringTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsJdbcXARollbackTest.class);
BrokerService broker = null;
int messageCount;
public java.sql.Connection initDb() throws Exception {
String createStatement =
"CREATE TABLE SCP_INPUT_MESSAGES (" +
"id int NOT NULL GENERATED ALWAYS AS IDENTITY, " +
"messageId varchar(96) NOT NULL, " +
"messageCorrelationId varchar(96) NOT NULL, " +
"messageContent varchar(2048) NOT NULL, " +
"PRIMARY KEY (id) )";
java.sql.Connection conn = getJDBCConnection();
try {
conn.createStatement().execute(createStatement);
} catch (SQLException alreadyExists) {
log.info("ex on create tables", alreadyExists);
}
try {
conn.createStatement().execute("DELETE FROM SCP_INPUT_MESSAGES");
} catch (SQLException ex) {
log.info("ex on create delete all", ex);
}
return conn;
}
private java.sql.Connection getJDBCConnection() throws Exception {
BasicDataSource dataSource = getMandatoryBean(BasicDataSource.class, "managedDataSourceWithRecovery");
return dataSource.getConnection();
}
private int dumpDb(java.sql.Connection jdbcConn) throws Exception {
int count = 0;
ResultSet resultSet = jdbcConn.createStatement().executeQuery("SELECT * FROM SCP_INPUT_MESSAGES");
while (resultSet.next()) {
count++;
log.info("message - seq:" + resultSet.getInt(1)
+ ", id: " + resultSet.getString(2)
+ ", corr: " + resultSet.getString(3)
+ ", content: " + resultSet.getString(4));
}
return count;
}
@Test
public void testConsumeRollback() throws Exception {
java.sql.Connection jdbcConn = initDb();
initTMRef();
sendJMSMessageToKickOffRoute();
// should go to dlq eventually
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return consumedFrom(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME);
}
});
assertEquals("message in db, commit to db worked", 0, dumpDb(jdbcConn));
assertFalse("Nothing to to out q", consumedFrom("scp_transacted_out"));
}
private boolean consumedFrom(String qName) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(qName));
Message message = consumer.receive(500);
LOG.info("Got from queue:{} {}", qName, message);
connection.close();
return message != null;
}
static TransactionManager[] transactionManager = new TransactionManager[1];
private void initTMRef() {
transactionManager[0] = getMandatoryBean(JtaTransactionManager.class, "jtaTransactionManager").getTransactionManager();
}
private void sendJMSMessageToKickOffRoute() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
TextMessage message = session.createTextMessage("Some Text, messageCount:" + messageCount++);
message.setJMSCorrelationID("pleaseCorrelate");
producer.send(message);
connection.close();
}
private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setBrokerName("testXA");
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
brokerService.setDataDirectory("target/data");
brokerService.addConnector("tcp://0.0.0.0:61616");
return brokerService;
}
@SuppressWarnings("unchecked")
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
deleteDirectory("target/data/howl");
// make broker available to recovery processing on app context start
try {
broker = createBroker(true);
broker.start();
} catch (Exception e) {
throw new RuntimeException("Failed to start broker", e);
}
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbcRollback.xml");
}
public static class MarkRollbackOnly {
public String enrich(Exchange exchange) throws Exception {
LOG.info("Got exchange: " + exchange);
LOG.info("Got message: " + ((JmsMessage)exchange.getIn()).getJmsMessage());
LOG.info("Current tx: " + transactionManager[0].getTransaction());
LOG.info("Marking rollback only...");
transactionManager[0].getTransaction().setRollbackOnly();
return "Some Text";
}
}
}

View File

@ -0,0 +1,120 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: jms_jdbc_xa -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
">
<context:annotation-config />
<!-- broker creation in code so it can be restarted and modified to test recovery -->
<!-- use jencks factory beans to easily configure howl and geronimo transaction manager -->
<bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl"/>
<!-- Transaction log -->
<bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean">
<property name="logFileDir" value="target/data/howl/txlog"/>
<property name="xidFactory" ref="xidFactory"/>
</bean>
<bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean">
<property name="transactionLog" ref="transactionLog"/>
</bean>
<bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?jms.dispatchAsync=false&amp;jms.redeliveryPolicy.maximumRedeliveries=2&amp;jms.redeliveryPolicy.initialRedeliveryDelay=100"/>
</bean>
<!-- register ActiveMQ with Geronimo to allow out of band transaction recovery/completion on a new connection
the resourceName gives the ActiveMQ XAResource an identity, Geronimo NamedXAResource in the transaction log
-->
<bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="connectionFactory" ref="activemqConnectionFactory"/>
<property name="resourceName" value="activemq.broker"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean"
depends-on="jenckTransactionManager">
<property name="maxConnections" value="1"/>
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="connectionFactory" ref="activemqConnectionFactory"/>
<property name="resourceName" value="activemq.broker"/>
</bean>
<!-- Configure the Spring framework (used by camel) to use JTA transactions from Geronimo -->
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="jenckTransactionManager"/>
</bean>
<!-- Define the activemq Camel component so we can integrate with the AMQ broker -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"
depends-on="pooledConnectionFactory">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<!-- cache level is important, can be cache connection or none, as session needs to be enlisted
in the current transaction they can't be cached, with default cache sessions, they are created
up front, before the transaction (required for the route) -->
<property name="cacheLevel" value="0"/>
</bean>
<!-- openejb provides geronimo NamedXAResources wrapper around commons dbcp such that they have an identity in the howl log -->
<bean id="geronimoXAResourceWrapper"
class="org.apache.openejb.resource.GeronimoTransactionManagerFactory.GeronimoXAResourceWrapper"/>
<bean id="managedDataSourceWithRecovery" class="org.apache.openejb.resource.jdbc.ManagedDataSourceWithRecovery">
<constructor-arg>
<ref bean="geronimoXAResourceWrapper"></ref>
</constructor-arg>
<property name="jdbcDriver" value="org.apache.derby.jdbc.EmbeddedDriver"/>
<property name="jdbcUrl" value="jdbc:derby:target/XatestDs;create=true"/>
<property name="transactionManager" ref="jenckTransactionManager"/>
</bean>
<bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jenckTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
</bean>
<bean id="markRollback" class="org.apache.activemq.camel.JmsJdbcXARollbackTest.MarkRollbackOnly"/>
<!-- the route, from jms to jdbc in an xa transaction -->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route id="queueToDbTransacted">
<from uri="activemq:queue:scp_transacted"/>
<transacted ref="required"/>
<bean ref="markRollback" method="enrich" />
<convertBodyTo type="java.lang.String"/>
<to uri="log:BeforeSettingBody?showAll=true"/>
<to uri="activemq:queue:scp_transacted_out"/>
<setBody>
<simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent)
VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}')
</simple>
</setBody>
<to uri="jdbc:managedDataSourceWithRecovery?resetAutoCommit=false"/>
</route>
</camelContext>
</beans>
<!-- END SNIPPET: jms_jdbc_xa -->