https://issues.apache.org/activemq/browse/AMQ-1886 - another test, but issue cannot be reproduced with embeded derby

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@820268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-09-30 13:36:49 +00:00
parent 3f84ef2906
commit 20972bfdaf
7 changed files with 632 additions and 0 deletions

View File

@ -94,6 +94,21 @@
<artifactId>spring-jms</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Random;
import javax.jms.ConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class ConsumerThread extends Thread {
private DefaultMessageListenerContainer container;
private MessageDrivenPojo messageListener;
private boolean run;
private String destination;
private ConnectionFactory connectionFactory;
private boolean durable;
private int concurrentConsumers;
private boolean sessionTransacted;
private boolean pubSubDomain;
private boolean running;
private Log log = LogFactory.getLog(ConsumerThread.class);
private int numberOfQueues;
@Override
public void run() {
run = true;
createContainer();
container.initialize();
container.start();
running = true;
while (run) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
container.stop();
container.destroy();
if (connectionFactory instanceof SingleConnectionFactory) {
((SingleConnectionFactory)connectionFactory).destroy();
}
log.info("ConsumerThread1 closing down");
}
private DefaultMessageListenerContainer createContainer() {
Random generator = new Random();
int queueSuffix = generator.nextInt(numberOfQueues);
container = new DefaultMessageListenerContainer();
container.setPubSubDomain(pubSubDomain);
container.setDestinationName(destination + queueSuffix);
container.setMessageListener(messageListener);
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(concurrentConsumers);
container.setSessionTransacted(sessionTransacted);
//container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
//container.setMaxConcurrentConsumers(concurrentConsumers);
//container.setAcceptMessagesWhileStopping(false);
//container.setAutoStartup(false);
//without setting a tx manager, this will use local JMS tx.
/*
if (durable) {
container.setSubscriptionDurable(true);
container.setDurableSubscriptionName("ConsumerThread1" + Thread.currentThread().getId());
}
*/
container.afterPropertiesSet();
return container;
}
/**
* @param messageListener the messageListener to set
*/
public void setMessageDrivenPojo(MessageDrivenPojo messageListener) {
this.messageListener = messageListener;
}
/**
* @param run the run to set
*/
public void setRun(boolean run) {
this.run = run;
}
/**
* @param destination the destination to set
*/
public void setDestination(String destination) {
this.destination = destination;
}
public void setNumberOfQueues(int no) {
this.numberOfQueues = no;
}
public int getNumberOfQueues() {
return this.numberOfQueues;
}
/**
* @param connectionFactory the connectionFactory to set
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* @param durable the durable to set
*/
public void setDurable(boolean durable) {
this.durable = durable;
}
/**
* @param concurrentConsumers the concurrentConsumers to set
*/
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
/**
* @param sessionTransacted the sessionTransacted to set
*/
public void setSessionTransacted(boolean sessionTransacted) {
this.sessionTransacted = sessionTransacted;
}
/**
* @param pubSubDomain the pubSubDomain to set
*/
public void setPubSubDomain(boolean pubSubDomain) {
this.pubSubDomain = pubSubDomain;
}
/**
* @return the messageListener
*/
public MessageDrivenPojo getMessageDrivenPojo() {
return messageListener;
}
public boolean isRunning() {
return running;
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class JDBCSpringTest extends TestCase {
private static Log log = LogFactory.getLog(JDBCSpringTest.class);
int numberOfConsumerThreads = 50;
int numberOfProducerThreads = 50;
int numberOfMessages = 100;
int numberOfQueues = 5;
String url = "tcp://localhost:61616";
BrokerService broker;
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
//broker.deleteAllMessages();
broker.start();
broker.waitUntilStarted();
}
protected void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public void testJDBCSpringTest() throws Exception {
log.info("Using " + numberOfConsumerThreads + " consumers, " +
numberOfProducerThreads + " producers, " +
numberOfMessages + " messages per publisher, and " +
numberOfQueues + " queues.");
ConnectionFactory connectionFactory;
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setQueuePrefetch(1);
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(url);
amq.setPrefetchPolicy(prefetch);
connectionFactory = new PooledConnectionFactory(amq);
((PooledConnectionFactory)connectionFactory).setMaxConnections(5);
StringBuffer buffer = new StringBuffer();
for (int i=0; i<2048; i++) {
buffer.append(".");
}
String twoKbMessage = buffer.toString();
List<ProducerThread> ProducerThreads = new ArrayList<ProducerThread>();
for (int i=0; i<numberOfProducerThreads; i++) {
ProducerThread thread = new ProducerThread();
thread.setMessage(twoKbMessage);
thread.setNumberOfMessagesToSend(numberOfMessages);
thread.setNumberOfQueues(numberOfQueues);
thread.setQueuePrefix("DEV-1786.queue.");
thread.setConnectionFactory(connectionFactory);
thread.setSendDelay(100);
ProducerThreads.add(thread);
}
List<Thread> ConsumerThreads = new ArrayList<Thread>();
for (int i=0; i<numberOfConsumerThreads; i++) {
ConsumerThread thread = new ConsumerThread();
MessageDrivenPojo mdp1 = new MessageDrivenPojo();
thread.setMessageDrivenPojo(mdp1);
thread.setConcurrentConsumers(1);
thread.setConnectionFactory(connectionFactory);
thread.setDestination("DEV-1786.queue.");
thread.setPubSubDomain(false);
thread.setSessionTransacted(true);
thread.setNumberOfQueues(numberOfQueues);
ConsumerThreads.add(thread);
thread.start();
while (!thread.isRunning()) {
Thread.sleep(200);
}
}
Thread.sleep(5000);
for (ProducerThread thread : ProducerThreads) {
thread.start();
}
boolean finished = false;
int retry = 0;
while (!finished) {
int totalMessages = 0;
retry++;
for (Thread thread : ConsumerThreads) {
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
}
if (totalMessages == (numberOfMessages * numberOfProducerThreads)) {
finished = true;
log.info("Received all " + totalMessages + " messages. Finishing.");
for (Thread thread : ConsumerThreads) {
((ConsumerThread)thread).setRun(false);
}
for (Thread thread : ConsumerThreads) {
thread.join();
}
} else {
if (retry == 10) {
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
}
Thread.sleep(10000);
log.info(totalMessages + " received so far...");
}
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageDrivenPojo implements MessageListener, Serializable {
private Log log = LogFactory.getLog(MessageDrivenPojo.class);
private AtomicInteger messageCount = new AtomicInteger();
/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
messageCount.incrementAndGet();
if (log.isDebugEnabled()) {
try {
logMessage(message);
} catch (Exception e) {
log.error("Error:", e);
}
}
try {
Thread.sleep(200);
} catch (InterruptedException ex ) {
log.error(ex);
}
}
private void logMessage(Message message) throws Exception {
StringBuffer buffer = new StringBuffer();
buffer.append("\nJMSMessageID:");
buffer.append(message.getJMSMessageID());
buffer.append("\nJMSCorrelationID:");
buffer.append(message.getJMSMessageID());
buffer.append("\nMessage Contents:\n");
if (message instanceof TextMessage) {
buffer.append(((TextMessage)message).getText());
} else {
buffer.append(message.toString());
}
log.debug(buffer.toString());
}
/**
* @return the stats
*/
protected int getMessageCount() {
return messageCount.get();
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Random;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ProducerThread extends Thread {
private JmsTemplate jmsTemplate;
private int numberOfTopics;
private int numberOfMessagesToSend;
private int messagesSent;
private Random generator = new Random();
private String queuePrefix;
private ConnectionFactory connectionFactory;
private String message;
private MessageCreator messageCreator;
private int sendDelay;
private Log log = LogFactory.getLog(ProducerThread.class);
@Override
public void run() {
initialize();
while (messagesSent < numberOfMessagesToSend) {
int queueSuffix = generator.nextInt(numberOfTopics);
jmsTemplate.send(queuePrefix + queueSuffix, messageCreator);
messagesSent++;
log.debug(Thread.currentThread().getName() +
": sent msg #" + messagesSent);
try {
Thread.sleep(sendDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("ProducerThread shutting down.");
}
private void initialize() {
jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(false);
jmsTemplate.setConnectionFactory(connectionFactory);
messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
}
/**
* @param numberOfTopics the numberOfTopics to set
*/
protected void setNumberOfQueues(int numberOfTopics) {
this.numberOfTopics = numberOfTopics;
}
/**
* @param queuePrefix the queuePrefix to set
*/
protected void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
/**
* @param connectionFactory the connectionFactory to set
*/
protected void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* @param message the message to set
*/
protected void setMessage(String message) {
this.message = message;
}
/**
* @param numberOfMessagesToSend the numberOfMessagesToSend to set
*/
protected void setNumberOfMessagesToSend(int numberOfMessagesToSend) {
this.numberOfMessagesToSend = numberOfMessagesToSend;
}
public void setSendDelay(int sendDelay) {
this.sendDelay = sendDelay;
}
public int getMessagesSent() {
return messagesSent;
}
}

View File

@ -0,0 +1,58 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="traget/"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
</transportConnectors>
</broker>
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
</beans>
<!-- END SNIPPET: example -->

View File

@ -0,0 +1,35 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true