second route is the way to use a second connection and let spring do the caching

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1162175 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-08-26 17:15:46 +00:00
parent 713dcaae56
commit 35384de3a7
2 changed files with 34 additions and 9 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.camel;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -26,6 +28,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@ -40,7 +43,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TransactedConsumeTest extends CamelSpringTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(TransactedConsumeTest.class);
BrokerService broker = null;
int messageCount = 1000;
int messageCount = 100000;
@Test
public void testConsume() throws Exception {
@ -53,6 +56,8 @@ public class TransactedConsumeTest extends CamelSpringTestSupport {
return broker.getAdminView().getTotalDequeueCount() >= messageCount;
}
}, 20 * 60 * 1000));
long duration = System.currentTimeMillis() - firstConsumed.get();
LOG.info("Done message consumption in " + duration + "millis");
}
private void sendJMSMessageToKickOffRoute() throws Exception {
@ -78,12 +83,14 @@ public class TransactedConsumeTest extends CamelSpringTestSupport {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultPolicy = new PolicyEntry();
// defaultPolicy.setStrictOrderDispatch(false);
policyMap.setDefaultEntry(defaultPolicy);
brokerService.setDestinationPolicy(policyMap);
brokerService.setAdvisorySupport(false);
brokerService.setDataDirectory("target/data");
AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
amq.setDirectory(new File("target/data"));
brokerService.setPersistenceAdapter(amq);
brokerService.addConnector("tcp://localhost:61616");
return brokerService;
}
@ -110,13 +117,21 @@ public class TransactedConsumeTest extends CamelSpringTestSupport {
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/transactedconsume.xml");
}
static AtomicLong firstConsumed = new AtomicLong();
static AtomicLong consumed = new AtomicLong();
static class ConnectionLog implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
if (consumed.getAndIncrement() == 0) {
firstConsumed.set(System.currentTimeMillis());
}
ActiveMQTextMessage m = (ActiveMQTextMessage) ((JmsMessage)exchange.getIn()).getJmsMessage();
Thread.currentThread().sleep(10);
LOG.info("received on " + m.getConnection().toString());
//Thread.currentThread().sleep(500);
if (consumed.get() %500 == 0) {
LOG.info("received on " + m.getConnection().toString());
}
}
}

View File

@ -28,12 +28,12 @@
<context:annotation-config/>
<bean id="vhfBatchListenerJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
<property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
</bean>
<bean id="vhfBatchListenerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="10"/>
<property name="maximumActive" value="10"/>
<property name="maxConnections" value="1"/>
<property name="maximumActive" value="1"/>
<property name="connectionFactory" ref="vhfBatchListenerJMSConnectionFactory"/>
</bean>
@ -66,13 +66,23 @@
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
</bean>
<bean id="activemq2" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="activemq:queue:scp_transacted"/>
<!-- transacted /-->
<process ref="connectonLog"/>
<process ref="connectionLog"/>
</route>
<!-- marginally better through put with a second route/connection -->
<route>
<from uri="activemq2:queue:scp_transacted"/>
<process ref="connectionLog"/>
</route>
</camelContext>
<bean id="connectonLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
<bean id="connectionLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
</beans>