test for transacted send and ack camel route with explicit set of connection factory necessary due to config clone across component endpoints

This commit is contained in:
gtully 2015-11-20 13:22:36 +00:00
parent 3a3dcac184
commit 48fbd3116d
2 changed files with 157 additions and 0 deletions

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Test;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class JmsConsumeSendTransacted extends CamelSpringTestSupport {
BrokerService broker = null;
int messageCount;
@Test
public void testTransactedRoute() throws Exception {
sendJMSMessageToKickOffRoute();
// camel route will use a single transaction for send and and ack
consumeMessages();
}
private void consumeMessages() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testTran");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("to"));
int messagesToConsume = messageCount;
while (messagesToConsume > 0) {
Message message = consumer.receive(5000);
if (message != null) {
messagesToConsume--;
}
}
}
private void sendJMSMessageToKickOffRoute() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testTran");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("from"));
TextMessage message = session.createTextMessage("Some Text, messageCount:" + messageCount++);
message.setIntProperty("seq", messageCount);
producer.send(message);
connection.close();
}
private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setBrokerName("testTran");
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
brokerService.setDataDirectory("target/data");
brokerService.addConnector("tcp://0.0.0.0:61616");
return brokerService;
}
@SuppressWarnings("unchecked")
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
try {
broker = createBroker(true);
broker.start();
} catch (Exception e) {
throw new RuntimeException("Failed to start broker", e);
}
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsConsumeSendTransacted.xml");
}
}

View File

@ -0,0 +1,57 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
<context:annotation-config />
<bean id="cf" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616)"/>
</bean>
<bean id="pooledCf" class="org.apache.activemq.pool.PooledConnectionFactory">
<!-- match maxConnections to the number of routes that share the connection factory -->
<property name="maxConnections" value="10"/>
<!-- match maximumActive (which is active sessions) >= concurrentConsumers in the MLC -->
<property name="maximumActiveSessionPerConnection" value="1"/>
<property name="connectionFactory" ref="cf"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<!-- set cf - so that inbound and outbound endpoints share the connection pool and transacted session -->
<property name="connectionFactory" ref="pooledCf" />
<property name="transacted" value="true" />
<property name="concurrentConsumers" value="1" />
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring" id="camel-queue-transporter">
<route id="move-route">
<from uri="activemq:queue:from" />
<to uri="activemq:queue:to" />
</route>
</camelContext>
</beans>