removing JDBCSpringTest as it is moved to sys tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@886833 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-12-03 15:57:19 +00:00
parent 4caad1a058
commit 72b73f06cd
4 changed files with 0 additions and 517 deletions

View File

@ -1,169 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Random;
import javax.jms.ConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class ConsumerThread extends Thread {
private DefaultMessageListenerContainer container;
private MessageDrivenPojo messageListener;
private boolean run;
private String destination;
private ConnectionFactory connectionFactory;
private boolean durable;
private int concurrentConsumers;
private boolean sessionTransacted;
private boolean pubSubDomain;
private boolean running;
private Log log = LogFactory.getLog(ConsumerThread.class);
private int numberOfQueues;
private String consumerName;
@Override
public void run() {
run = true;
createContainer();
container.initialize();
container.start();
running = true;
while (run) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
container.stop();
container.destroy();
if (connectionFactory instanceof SingleConnectionFactory) {
((SingleConnectionFactory)connectionFactory).destroy();
}
log.info("ConsumerThread closing down");
}
private DefaultMessageListenerContainer createContainer() {
Random generator = new Random(consumerName.hashCode());
int queueSuffix = generator.nextInt(numberOfQueues);
container = new DefaultMessageListenerContainer();
container.setPubSubDomain(pubSubDomain);
container.setDestinationName(destination + queueSuffix);
container.setMessageListener(messageListener);
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(concurrentConsumers);
container.setSessionTransacted(sessionTransacted);
container.afterPropertiesSet();
log.info("subscribing to " + destination + queueSuffix);
return container;
}
/**
* @param messageListener the messageListener to set
*/
public void setMessageDrivenPojo(MessageDrivenPojo messageListener) {
this.messageListener = messageListener;
}
/**
* @param run the run to set
*/
public void setRun(boolean run) {
this.run = run;
}
/**
* @param destination the destination to set
*/
public void setDestination(String destination) {
this.destination = destination;
}
public void setNumberOfQueues(int no) {
this.numberOfQueues = no;
}
public int getNumberOfQueues() {
return this.numberOfQueues;
}
public void setConsumerName(String name) {
this.consumerName = name;
}
/**
* @param connectionFactory the connectionFactory to set
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* @param durable the durable to set
*/
public void setDurable(boolean durable) {
this.durable = durable;
}
/**
* @param concurrentConsumers the concurrentConsumers to set
*/
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
/**
* @param sessionTransacted the sessionTransacted to set
*/
public void setSessionTransacted(boolean sessionTransacted) {
this.sessionTransacted = sessionTransacted;
}
/**
* @param pubSubDomain the pubSubDomain to set
*/
public void setPubSubDomain(boolean pubSubDomain) {
this.pubSubDomain = pubSubDomain;
}
/**
* @return the messageListener
*/
public MessageDrivenPojo getMessageDrivenPojo() {
return messageListener;
}
public boolean isRunning() {
return running;
}
}

View File

@ -1,150 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class JDBCSpringTest extends TestCase {
private static Log log = LogFactory.getLog(JDBCSpringTest.class);
int numberOfConsumerThreads = 20;
int numberOfProducerThreads = 20;
int numberOfMessages = 50;
int numberOfQueues = 5;
String url = "tcp://localhost:61616";
BrokerService broker;
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
broker.start();
broker.waitUntilStarted();
}
protected void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public void testJDBCSpringTest() throws Exception {
log.info("Using " + numberOfConsumerThreads + " consumers, " +
numberOfProducerThreads + " producers, " +
numberOfMessages + " messages per publisher, and " +
numberOfQueues + " queues.");
ConnectionFactory connectionFactory;
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setQueuePrefetch(1);
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(url);
amq.setPrefetchPolicy(prefetch);
connectionFactory = new PooledConnectionFactory(amq);
((PooledConnectionFactory)connectionFactory).setMaxConnections(5);
StringBuffer buffer = new StringBuffer();
for (int i=0; i<2048; i++) {
buffer.append(".");
}
String twoKbMessage = buffer.toString();
List<ProducerThread> ProducerThreads = new ArrayList<ProducerThread>();
for (int i=0; i<numberOfProducerThreads; i++) {
ProducerThread thread = new ProducerThread();
thread.setMessage(twoKbMessage);
thread.setNumberOfMessagesToSend(numberOfMessages);
thread.setNumberOfQueues(numberOfQueues);
thread.setQueuePrefix("AMQ-2436.queue.");
thread.setConnectionFactory(connectionFactory);
//thread.setSendDelay(100);
ProducerThreads.add(thread);
}
List<Thread> ConsumerThreads = new ArrayList<Thread>();
for (int i=0; i<numberOfConsumerThreads; i++) {
ConsumerThread thread = new ConsumerThread();
MessageDrivenPojo mdp1 = new MessageDrivenPojo();
thread.setMessageDrivenPojo(mdp1);
thread.setConcurrentConsumers(1);
thread.setConnectionFactory(connectionFactory);
thread.setDestination("AMQ-2436.queue.");
thread.setPubSubDomain(false);
thread.setSessionTransacted(true);
thread.setNumberOfQueues(numberOfQueues);
thread.setConsumerName("consumer" + i);
ConsumerThreads.add(thread);
thread.start();
}
for (ProducerThread thread : ProducerThreads) {
thread.start();
}
boolean finished = false;
int previous = 0;
while (!finished) {
int totalMessages = 0;
for (Thread thread : ConsumerThreads) {
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
}
log.info(totalMessages + " received so far...");
if (totalMessages != 0 && previous == totalMessages) {
for (Thread thread : ConsumerThreads) {
((ConsumerThread)thread).setRun(false);
}
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
}
previous = totalMessages;
if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
finished = true;
log.info("Received all " + totalMessages + " messages. Finishing.");
for (Thread thread : ConsumerThreads) {
((ConsumerThread)thread).setRun(false);
}
for (Thread thread : ConsumerThreads) {
thread.join();
}
} else {
Thread.sleep(1000);
}
}
}
}

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageDrivenPojo implements MessageListener, Serializable {
private Log log = LogFactory.getLog(MessageDrivenPojo.class);
private AtomicInteger messageCount = new AtomicInteger();
/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
messageCount.incrementAndGet();
if (log.isDebugEnabled()) {
try {
logMessage(message);
} catch (Exception e) {
log.error("Error:", e);
}
}
try {
Thread.sleep(200);
} catch (InterruptedException ex ) {
log.error(ex);
}
}
private void logMessage(Message message) throws Exception {
StringBuffer buffer = new StringBuffer();
buffer.append("\nJMSMessageID:");
buffer.append(message.getJMSMessageID());
buffer.append("\nJMSCorrelationID:");
buffer.append(message.getJMSMessageID());
buffer.append("\nMessage Contents:\n");
if (message instanceof TextMessage) {
buffer.append(((TextMessage)message).getText());
} else {
buffer.append(message.toString());
}
log.debug(buffer.toString());
}
/**
* @return the stats
*/
protected int getMessageCount() {
return messageCount.get();
}
}

View File

@ -1,117 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Random;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ProducerThread extends Thread {
private JmsTemplate jmsTemplate;
private int numberOfTopics;
private int numberOfMessagesToSend;
private int messagesSent;
private Random generator;
private String queuePrefix;
private ConnectionFactory connectionFactory;
private String message;
private MessageCreator messageCreator;
private int sendDelay;
private Log log = LogFactory.getLog(ProducerThread.class);
@Override
public void run() {
initialize();
Random generator = new Random(Thread.currentThread().getName().hashCode());
while (messagesSent < numberOfMessagesToSend) {
int queueSuffix = generator.nextInt(numberOfTopics);
jmsTemplate.send(queuePrefix + queueSuffix, messageCreator);
messagesSent++;
log.debug(Thread.currentThread().getName() +
": sent msg #" + messagesSent);
try {
Thread.sleep(sendDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("ProducerThread shutting down.");
}
private void initialize() {
jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(false);
jmsTemplate.setConnectionFactory(connectionFactory);
messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
}
/**
* @param numberOfTopics the numberOfTopics to set
*/
protected void setNumberOfQueues(int numberOfTopics) {
this.numberOfTopics = numberOfTopics;
}
/**
* @param queuePrefix the queuePrefix to set
*/
protected void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
/**
* @param connectionFactory the connectionFactory to set
*/
protected void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* @param message the message to set
*/
protected void setMessage(String message) {
this.message = message;
}
/**
* @param numberOfMessagesToSend the numberOfMessagesToSend to set
*/
protected void setNumberOfMessagesToSend(int numberOfMessagesToSend) {
this.numberOfMessagesToSend = numberOfMessagesToSend;
}
public void setSendDelay(int sendDelay) {
this.sendDelay = sendDelay;
}
public int getMessagesSent() {
return messagesSent;
}
}