fix some problems in Quick Journal - now message containers are using Kaha maps instead of lists

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@504501 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-07 11:12:53 +00:00
parent d32ae27f18
commit 6e7e3abf5d
11 changed files with 86 additions and 126 deletions

View File

@ -22,12 +22,12 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageListener; import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.store.kahadaptor; package org.apache.activemq.kaha;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;

View File

@ -20,21 +20,20 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.MessageMarshaller; import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory; import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
@ -78,7 +77,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>(); Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>();
try{ try{
Store store=getStore(); Store store=getStore();
for(Iterator i=store.getListContainerIds().iterator();i.hasNext();){ for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next(); Object obj=i.next();
if(obj instanceof ActiveMQDestination){ if(obj instanceof ActiveMQDestination){
rc.add((ActiveMQDestination) obj); rc.add((ActiveMQDestination) obj);

View File

@ -110,7 +110,7 @@ public class KahaReferenceStore implements ReferenceStore{
public void addReferenceFileIdsInUse(Set<Integer> rc){ public void addReferenceFileIdsInUse(Set<Integer> rc){
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry); ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
rc.add(msg.data.getFileId()); rc.add(msg.data.getFileId());
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.MessageMarshaller; import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;

View File

@ -100,7 +100,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry); TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
if(subAck.getCount()>0){ if(subAck.getCount()>0){
ReferenceRecord rr=(ReferenceRecord)messageContainer.get(subAck.getMessageEntry()); ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
rc.add(rr.data.getFileId()); rc.add(rr.data.getFileId());
} }
} }

View File

@ -1,43 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.Marshaller;
/**
* Marshall an AtomicInteger
* @version $Revision: 1.10 $
*/
public class MessageIdMarshaller implements Marshaller<MessageId>{
public void writePayload(MessageId mid,DataOutput dataOut) throws IOException{
dataOut.writeUTF(mid.toString());
}
public MessageId readPayload(DataInput dataIn) throws IOException{
String str = dataIn.readUTF();
return new MessageId(str);
}
}

View File

@ -48,6 +48,7 @@ import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -60,9 +61,6 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
import org.apache.activemq.store.kahadaptor.KahaTopicMessageStore;
import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller; import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller;
import org.apache.activemq.store.rapid.RapidTransactionStore.Tx; import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation; import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;

View File

@ -1,20 +1,17 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.perf; package org.apache.activemq.perf;
import javax.jms.Connection; import javax.jms.Connection;
@ -22,37 +19,38 @@ import javax.jms.ConnectionFactory;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
public class SimpleTopicTest extends TestCase{ public class SimpleTopicTest extends TestCase{
private final Log log=LogFactory.getLog(getClass());
private final Log log=LogFactory.getLog(getClass());
protected BrokerService broker; protected BrokerService broker;
// protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false"; // protected String
// bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true"; protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true";
//protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=false"; // protected String
//protected String bindAddress="vm://localhost?marshal=true"; // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=false";
//protected String bindAddress="vm://localhost"; // protected String bindAddress="vm://localhost?marshal=true";
// protected String bindAddress="vm://localhost";
protected PerfProducer[] producers; protected PerfProducer[] producers;
protected PerfConsumer[] consumers; protected PerfConsumer[] consumers;
protected String DESTINATION_NAME=getClass().getName(); protected String DESTINATION_NAME=getClass().getName();
protected int SAMPLE_COUNT = 30; protected int SAMPLE_COUNT=30;
protected long SAMPLE_INTERVAL = 2000; protected long SAMPLE_INTERVAL=2000;
protected int NUMBER_OF_CONSUMERS=10; protected int NUMBER_OF_CONSUMERS=1;
protected int NUMBER_OF_PRODUCERS=1; protected int NUMBER_OF_PRODUCERS=1;
protected int PAYLOAD_SIZE=1024; protected int PAYLOAD_SIZE=1024;
protected byte[] array=null; protected byte[] array=null;
protected ConnectionFactory factory; protected ConnectionFactory factory;
protected Destination destination; protected Destination destination;
protected long CONSUMER_SLEEP_DURATION = 0; protected long CONSUMER_SLEEP_DURATION=0;
/** /**
* Sets up a test where the producer and consumer have their own connection. * Sets up a test where the producer and consumer have their own connection.
@ -66,11 +64,9 @@ public class SimpleTopicTest extends TestCase{
factory=createConnectionFactory(); factory=createConnectionFactory();
Connection con=factory.createConnection(); Connection con=factory.createConnection();
Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE); Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=createDestination(session,DESTINATION_NAME); destination=createDestination(session,DESTINATION_NAME);
log.info("Testing against destination: "+destination); log.info("Testing against destination: "+destination);
log.info("Running "+NUMBER_OF_PRODUCERS+" producer(s) and "+NUMBER_OF_CONSUMERS+" consumer(s)"); log.info("Running "+NUMBER_OF_PRODUCERS+" producer(s) and "+NUMBER_OF_CONSUMERS+" consumer(s)");
con.close(); con.close();
producers=new PerfProducer[NUMBER_OF_PRODUCERS]; producers=new PerfProducer[NUMBER_OF_PRODUCERS];
consumers=new PerfConsumer[NUMBER_OF_CONSUMERS]; consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
@ -81,7 +77,7 @@ public class SimpleTopicTest extends TestCase{
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
array=new byte[PAYLOAD_SIZE]; array=new byte[PAYLOAD_SIZE];
for(int j=i;j<array.length;j++){ for(int j=i;j<array.length;j++){
array[j]=(byte) j; array[j]=(byte)j;
} }
producers[i]=createProducer(factory,destination,i,array); producers[i]=createProducer(factory,destination,i,array);
} }
@ -118,7 +114,8 @@ public class SimpleTopicTest extends TestCase{
return answer; return answer;
} }
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number, byte[] payload) throws JMSException{ protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,byte[] payload)
throws JMSException{
return new PerfProducer(fac,dest,payload); return new PerfProducer(fac,dest,payload);
} }
@ -129,29 +126,25 @@ public class SimpleTopicTest extends TestCase{
protected void configureBroker(BrokerService answer) throws Exception{ protected void configureBroker(BrokerService answer) throws Exception{
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
} }
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
return new ActiveMQConnectionFactory(bindAddress); return new ActiveMQConnectionFactory(bindAddress);
} }
public void testPerformance() throws JMSException, InterruptedException{ public void testPerformance() throws JMSException,InterruptedException{
for(int i=0;i<NUMBER_OF_CONSUMERS;i++){ for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
consumers[i].start(); consumers[i].start();
} }
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
producers[i].start(); producers[i].start();
} }
log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval.");
log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval."); for(int i=0;i<SAMPLE_COUNT;i++){
for(int i=0; i < SAMPLE_COUNT; i++){ Thread.sleep(SAMPLE_INTERVAL);
Thread.sleep(SAMPLE_INTERVAL);
dumpProducerRate(); dumpProducerRate();
dumpConsumerRate(); dumpConsumerRate();
} }
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){ for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
producers[i].stop(); producers[i].stop();
} }
@ -164,11 +157,11 @@ public class SimpleTopicTest extends TestCase{
int totalRate=0; int totalRate=0;
int totalCount=0; int totalCount=0;
for(int i=0;i<producers.length;i++){ for(int i=0;i<producers.length;i++){
PerfRate rate = producers[i].getRate().cloneAndReset(); PerfRate rate=producers[i].getRate().cloneAndReset();
totalRate+=rate.getRate(); totalRate+=rate.getRate();
totalCount+=rate.getTotalCount(); totalCount+=rate.getTotalCount();
} }
int avgRate = totalRate/producers.length; int avgRate=totalRate/producers.length;
log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent = "+totalCount); log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent = "+totalCount);
} }
@ -176,11 +169,13 @@ public class SimpleTopicTest extends TestCase{
int totalRate=0; int totalRate=0;
int totalCount=0; int totalCount=0;
for(int i=0;i<consumers.length;i++){ for(int i=0;i<consumers.length;i++){
PerfRate rate = consumers[i].getRate().cloneAndReset(); PerfRate rate=consumers[i].getRate().cloneAndReset();
totalRate+=rate.getRate(); totalRate+=rate.getRate();
totalCount+=rate.getTotalCount(); totalCount+=rate.getTotalCount();
} }
int avgRate = totalRate/consumers.length; if(consumers!=null&&consumers.length>0){
log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received = "+totalCount); int avgRate=totalRate/consumers.length;
log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received = "+totalCount);
}
} }
} }

View File

@ -25,7 +25,7 @@ import javax.jms.Message;
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
public class SlowConsumer extends PerfConsumer{ public class SlowConsumer extends PerfConsumer{
public SlowConsumer(ConnectionFactory fac,Destination dest,String consumerName,boolean slowConsumer) public SlowConsumer(ConnectionFactory fac,Destination dest,String consumerName)
throws JMSException{ throws JMSException{
super(fac,dest,consumerName); super(fac,dest,consumerName);
} }
@ -36,6 +36,7 @@ public class SlowConsumer extends PerfConsumer{
public void onMessage(Message msg){ public void onMessage(Message msg){
super.onMessage(msg); super.onMessage(msg);
System.err.println("GOT A MSG " + msg);
try{ try{
Thread.sleep(10000); Thread.sleep(10000);
}catch(InterruptedException e){ }catch(InterruptedException e){

View File

@ -1,52 +1,46 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.perf; package org.apache.activemq.perf;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean; import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
/** /**
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
public class SlowConsumerTopicTest extends SimpleTopicTest{ public class SlowConsumerTopicTest extends SimpleTopicTest{
protected PerfConsumer[] slowConsumers; protected PerfConsumer[] slowConsumers;
protected int NUMBER_OF_SLOW_CONSUMERS=1; protected int NUMBER_OF_SLOW_CONSUMERS=1;
protected void setUp() throws Exception{ protected void setUp() throws Exception{
NUMBER_OF_CONSUMERS=0;
PAYLOAD_SIZE=10 * 1024;
super.setUp(); super.setUp();
slowConsumers=new SlowConsumer[NUMBER_OF_SLOW_CONSUMERS]; slowConsumers=new SlowConsumer[NUMBER_OF_SLOW_CONSUMERS];
for(int i=0;i<NUMBER_OF_SLOW_CONSUMERS;i++){ for(int i=0;i<NUMBER_OF_SLOW_CONSUMERS;i++){
consumers[i]=createSlowConsumer(factory,destination,i); slowConsumers[i]=createSlowConsumer(factory,destination,i);
consumers[i].start(); slowConsumers[i].start();
} }
} }
@ -54,12 +48,27 @@ public class SlowConsumerTopicTest extends SimpleTopicTest{
return new SlowConsumer(fac,dest); return new SlowConsumer(fac,dest);
} }
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,byte[] payload)
throws JMSException{
PerfProducer result=super.createProducer(fac,dest,number,payload);
result.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return result;
}
protected BrokerService createBroker() throws Exception{ protected BrokerService createBroker() throws Exception{
Resource resource=new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml"); Resource resource=new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
BrokerFactoryBean factory=new BrokerFactoryBean(resource); BrokerFactoryBean factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet(); factory.afterPropertiesSet();
BrokerService broker =factory.getBroker(); BrokerService broker=factory.getBroker();
broker.start(); broker.start();
return broker; return broker;
} }
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
ActiveMQConnectionFactory result = super.createConnectionFactory();
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setTopicPrefetch(1000);
result.setPrefetchPolicy(policy);
return result;
}
} }