diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml index 92fd16be6f..aa9cdf32b2 100755 --- a/activemq-pool/pom.xml +++ b/activemq-pool/pom.xml @@ -94,6 +94,21 @@ spring-jms test + + org.apache.xbean + xbean-spring + test + + + log4j + log4j + test + + + org.apache.derby + derby + test + diff --git a/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java b/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java new file mode 100644 index 0000000000..75623c2e1c --- /dev/null +++ b/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usecases; + + +import java.util.Random; + +import javax.jms.ConnectionFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.jms.connection.SingleConnectionFactory; +import org.springframework.jms.listener.DefaultMessageListenerContainer; + +public class ConsumerThread extends Thread { + private DefaultMessageListenerContainer container; + private MessageDrivenPojo messageListener; + private boolean run; + private String destination; + private ConnectionFactory connectionFactory; + private boolean durable; + private int concurrentConsumers; + private boolean sessionTransacted; + private boolean pubSubDomain; + private boolean running; + private Log log = LogFactory.getLog(ConsumerThread.class); + private int numberOfQueues; + + @Override + public void run() { + run = true; + createContainer(); + container.initialize(); + container.start(); + + running = true; + + while (run) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + container.stop(); + container.destroy(); + + if (connectionFactory instanceof SingleConnectionFactory) { + ((SingleConnectionFactory)connectionFactory).destroy(); + } + + log.info("ConsumerThread1 closing down"); + } + + private DefaultMessageListenerContainer createContainer() { + Random generator = new Random(); + int queueSuffix = generator.nextInt(numberOfQueues); + + + container = new DefaultMessageListenerContainer(); + container.setPubSubDomain(pubSubDomain); + container.setDestinationName(destination + queueSuffix); + container.setMessageListener(messageListener); + container.setConnectionFactory(connectionFactory); + container.setConcurrentConsumers(concurrentConsumers); + container.setSessionTransacted(sessionTransacted); + + //container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); + //container.setMaxConcurrentConsumers(concurrentConsumers); + //container.setAcceptMessagesWhileStopping(false); + //container.setAutoStartup(false); + //without setting a tx manager, this will use local JMS tx. + + /* + if (durable) { + container.setSubscriptionDurable(true); + container.setDurableSubscriptionName("ConsumerThread1" + Thread.currentThread().getId()); + } + */ + container.afterPropertiesSet(); + return container; + } + + /** + * @param messageListener the messageListener to set + */ + public void setMessageDrivenPojo(MessageDrivenPojo messageListener) { + this.messageListener = messageListener; + } + + /** + * @param run the run to set + */ + public void setRun(boolean run) { + this.run = run; + } + + /** + * @param destination the destination to set + */ + public void setDestination(String destination) { + this.destination = destination; + } + + public void setNumberOfQueues(int no) { + this.numberOfQueues = no; + } + + public int getNumberOfQueues() { + return this.numberOfQueues; + } + + /** + * @param connectionFactory the connectionFactory to set + */ + public void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + /** + * @param durable the durable to set + */ + public void setDurable(boolean durable) { + this.durable = durable; + } + + /** + * @param concurrentConsumers the concurrentConsumers to set + */ + public void setConcurrentConsumers(int concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + /** + * @param sessionTransacted the sessionTransacted to set + */ + public void setSessionTransacted(boolean sessionTransacted) { + this.sessionTransacted = sessionTransacted; + } + + /** + * @param pubSubDomain the pubSubDomain to set + */ + public void setPubSubDomain(boolean pubSubDomain) { + this.pubSubDomain = pubSubDomain; + } + + /** + * @return the messageListener + */ + public MessageDrivenPojo getMessageDrivenPojo() { + return messageListener; + } + + public boolean isRunning() { + return running; + } +} diff --git a/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java b/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java new file mode 100644 index 0000000000..8addee9408 --- /dev/null +++ b/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usecases; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.ConnectionFactory; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class JDBCSpringTest extends TestCase { + + private static Log log = LogFactory.getLog(JDBCSpringTest.class); + + int numberOfConsumerThreads = 50; + int numberOfProducerThreads = 50; + int numberOfMessages = 100; + int numberOfQueues = 5; + String url = "tcp://localhost:61616"; + + BrokerService broker; + + public void setUp() throws Exception { + broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml"); + //broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + } + + + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + + public void testJDBCSpringTest() throws Exception { + log.info("Using " + numberOfConsumerThreads + " consumers, " + + numberOfProducerThreads + " producers, " + + numberOfMessages + " messages per publisher, and " + + numberOfQueues + " queues."); + + ConnectionFactory connectionFactory; + + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setQueuePrefetch(1); + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(url); + amq.setPrefetchPolicy(prefetch); + + connectionFactory = new PooledConnectionFactory(amq); + ((PooledConnectionFactory)connectionFactory).setMaxConnections(5); + + + StringBuffer buffer = new StringBuffer(); + for (int i=0; i<2048; i++) { + buffer.append("."); + } + String twoKbMessage = buffer.toString(); + + List ProducerThreads = new ArrayList(); + for (int i=0; i ConsumerThreads = new ArrayList(); + for (int i=0; i + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-pool/src/test/resources/log4j.properties b/activemq-pool/src/test/resources/log4j.properties new file mode 100755 index 0000000000..d95fe6f509 --- /dev/null +++ b/activemq-pool/src/test/resources/log4j.properties @@ -0,0 +1,35 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=INFO, out, stdout + +log4j.logger.org.apache.activemq.spring=WARN + +# CONSOLE appender not used by default +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.out=org.apache.log4j.FileAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.out.file=target/activemq-test.log +log4j.appender.out.append=true