Added slow consumer test

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383485 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-06 07:24:16 +00:00
parent 64bf22ce94
commit 6892f8ca04
4 changed files with 152 additions and 6 deletions

View File

@ -44,6 +44,8 @@ public class SimpleTopicTest extends TestCase{
protected int PAYLOAD_SIZE=1024;
protected int MESSAGE_COUNT=1000000;
protected byte[] array=null;
protected ConnectionFactory factory;
protected Destination destination;
/**
* Sets up a test where the producer and consumer have their own connection.
@ -58,21 +60,21 @@ public class SimpleTopicTest extends TestCase{
for(int i=0;i<array.length;i++){
array[i]=(byte) i;
}
ConnectionFactory fac=createConnectionFactory();
Connection con=fac.createConnection();
factory=createConnectionFactory();
Connection con=factory.createConnection();
Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
payload=session.createBytesMessage();
payload.writeBytes(array);
Destination dest=createDestination(session,DESTINATION_NAME);
destination=createDestination(session,DESTINATION_NAME);
con.close();
producers=new PerfProducer[NUMBER_OF_PRODUCERS];
consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
consumers[i]=createConsumer(fac,dest,i);
consumers[i]=createConsumer(factory,destination,i);
consumers[i].start();
}
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
producers[i]=createProducer(fac,dest,i);
producers[i]=createProducer(factory,destination,i);
producers[i].start();
}
super.setUp();
@ -125,7 +127,7 @@ public class SimpleTopicTest extends TestCase{
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
// ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true");
// ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true&wireFormat.cacheEnabled=false");
cf.setAsyncDispatch(false);
// cf.setAsyncDispatch(false);
return cf;
}

View File

@ -0,0 +1,41 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.perf;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
/**
* @version $Revision: 1.3 $
*/
public class SlowConsumer extends PerfConsumer{
public SlowConsumer(ConnectionFactory fac,Destination dest,String consumerName,boolean slowConsumer)
throws JMSException{
super(fac,dest,consumerName);
}
public SlowConsumer(ConnectionFactory fac,Destination dest) throws JMSException{
super(fac,dest,null);
}
public void onMessage(Message msg){
super.onMessage(msg);
try{
Thread.sleep(10000);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,64 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.perf;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
/**
* @version $Revision: 1.3 $
*/
public class SlowConsumerTopicTest extends SimpleTopicTest{
protected PerfConsumer[] slowConsumers;
protected int NUMBER_OF_SLOW_CONSUMERS=1;
protected void setUp() throws Exception{
super.setUp();
slowConsumers=new SlowConsumer[NUMBER_OF_SLOW_CONSUMERS];
for(int i=0;i<NUMBER_OF_SLOW_CONSUMERS;i++){
consumers[i]=createSlowConsumer(factory,destination,i);
consumers[i].start();
}
}
protected PerfConsumer createSlowConsumer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
return new SlowConsumer(fac,dest);
}
protected BrokerService createBroker() throws Exception{
Resource resource=new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
BrokerFactoryBean factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService broker =factory.getBroker();
broker.start();
return broker;
}
}

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2005-2006 The Apache Software Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://activemq.org/config/1.0">
<broker brokerName="slowConsumerBroker" persistent="true" useShutdownHook="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>