From b0b602121e3aa4e911cd40282f5d2ad9d4e07dd7 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 20 Feb 2007 14:01:55 +0000 Subject: [PATCH] set the prefetch to something sensible (this is actually a key bit!) git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@509566 13f79535-47bb-0310-9956-ffa450edef68 --- .../bugs/JmsDurableTopicSlowReceiveTest.java | 275 ++++++++---------- 1 file changed, 123 insertions(+), 152 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java index 10d1c5ab6b..5b2765108c 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java @@ -1,24 +1,21 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.bugs; import java.io.File; +import java.util.Properties; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -37,164 +34,138 @@ import org.apache.activemq.test.JmsTopicSendReceiveTest; /** * @version $Revision: 1.5 $ */ -public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest { - private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory - .getLog(JmsDurableTopicSlowReceiveTest.class); +public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{ - protected Connection connection2; + private static final org.apache.commons.logging.Log log=org.apache.commons.logging.LogFactory + .getLog(JmsDurableTopicSlowReceiveTest.class); + protected Connection connection2; + protected Session session2; + protected Session consumeSession2; + protected MessageConsumer consumer2; + protected MessageProducer producer2; + protected Destination consumerDestination2; + BrokerService broker; + final int NMSG=100; + final int MSIZE=256000; + private Connection connection3; + private Session consumeSession3; + private TopicSubscriber consumer3; - protected Session session2; + /** + * Set up a durable suscriber test. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception{ + this.durable=true; + broker=createBroker(); + super.setUp(); + } - protected Session consumeSession2; + protected void tearDown() throws Exception{ + super.tearDown(); + broker.stop(); + } - protected MessageConsumer consumer2; - - protected MessageProducer producer2; - - protected Destination consumerDestination2; - - - final int NMSG = 100; - - final int MSIZE = 256000; - - - private Connection connection3; - - private Session consumeSession3; - - private TopicSubscriber consumer3; - - /** - * Set up a durable suscriber test. - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - this.durable = true; - createBroker(); - super.setUp(); - } - - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost"); - + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ + ActiveMQConnectionFactory result=new ActiveMQConnectionFactory("vm://localhost"); + Properties props=new Properties(); + props.put("prefetchPolicy.durableTopicPrefetch","5"); + props.put("prefetchPolicy.optimizeDurableTopicPrefetch","5"); + result.setProperties(props); return result; } - + protected BrokerService createBroker() throws Exception{ BrokerService answer=new BrokerService(); configureBroker(answer); answer.start(); return answer; } - + protected void configureBroker(BrokerService answer) throws Exception{ - //KahaPersistenceAdapter adapter = new KahaPersistenceAdapter(new File("activemq-data/durableTest")); - //answer.setPersistenceAdapter(adapter); + //KahaPersistenceAdapter adapter=new KahaPersistenceAdapter(new File("activemq-data/durableTest")); //JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter(); - //answer.setPersistenceAdapter(adapter); + // answer.setPersistenceAdapter(adapter); answer.setDeleteAllMessagesOnStartup(true); } - - - /** - * Test if all the messages sent are being received. - * - * @throws Exception - */ - public void testSlowReceiver() throws Exception { - connection2 = createConnection(); - connection2.setClientID("test"); - connection2.start(); - consumeSession2 = connection2.createSession(false, - Session.AUTO_ACKNOWLEDGE); - session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumerDestination2 = session2.createTopic(getConsumerSubject() + "2"); - consumer2 = consumeSession2.createDurableSubscriber( - (Topic) consumerDestination2, getName()); - Thread.sleep(1000); - consumer2.close(); - connection2.close(); + /** + * Test if all the messages sent are being received. + * + * @throws Exception + */ + public void testSlowReceiver() throws Exception{ + connection2=createConnection(); + connection2.setClientID("test"); + connection2.start(); + consumeSession2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE); + session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE); + consumerDestination2=session2.createTopic(getConsumerSubject()+"2"); + consumer2=consumeSession2.createDurableSubscriber((Topic)consumerDestination2,getName()); + Thread.sleep(1000); + consumer2.close(); + connection2.close(); + new Thread(new Runnable(){ - new Thread(new Runnable() { - public void run() { - try { - for (int loop = 0; loop < 4; loop++) { - connection2 = createConnection(); - connection2.start(); - session2 = connection2.createSession(false, - Session.AUTO_ACKNOWLEDGE); - producer2 = session2.createProducer(null); - producer2.setDeliveryMode(deliveryMode); - - Thread.sleep(1000); - - for (int i = 0; i < NMSG / 4; i++) { - BytesMessage message = session2 - .createBytesMessage(); - message.writeBytes(new byte[MSIZE]); - message.setStringProperty("test", "test"); - message.setJMSType("test"); - producer2.send(consumerDestination2, message); - Thread.sleep(50); - System.err.println("Sent(" + loop +"): " + i); - - } - producer2.close(); - connection2.stop(); - connection2.close(); - - - } - } catch (Throwable e) { + public void run(){ + try{ + for(int loop=0;loop<4;loop++){ + connection2=createConnection(); + connection2.start(); + session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE); + producer2=session2.createProducer(null); + producer2.setDeliveryMode(deliveryMode); + Thread.sleep(1000); + for(int i=0;i