();
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- // ServerManagement.start("all");
-
- ic = getInitialContext();
-
- createTopic("StressTestTopic");
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- destroyTopic("StressTestTopic");
- ic.close();
- super.tearDown();
- }
-
- @Test
- public void testManyConnections() throws Exception
- {
- ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
-
- Topic topic = (Topic) ic.lookup("/topic/StressTestTopic");
-
- Connection[] conns = new Connection[ManyConnectionsStressTest.NUM_CONNECTIONS];
-
- for (int i = 0; i < ManyConnectionsStressTest.NUM_CONNECTIONS; i++)
- {
- conns[i] = addConnection(cf.createConnection());
-
- Session sess = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(topic);
-
- MyListener listener = new MyListener();
-
- synchronized (listeners)
- {
- listeners.add(listener);
- }
-
- cons.setMessageListener(listener);
-
- conns[i].start();
-
- log.info("Created " + i);
- }
-
- // Thread.sleep(100 * 60 * 1000);
-
- Connection connSend = addConnection(cf.createConnection());
-
- Session sessSend = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(topic);
-
- for (int i = 0; i < ManyConnectionsStressTest.NUM_MESSAGES; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- tm.setIntProperty("count", i);
-
- prod.send(tm);
- }
-
- long wait = 30000;
-
- synchronized (listeners)
- {
- while (!listeners.isEmpty() && wait > 0)
- {
- long start = System.currentTimeMillis();
- try
- {
- listeners.wait(wait);
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- wait -= System.currentTimeMillis() - start;
- }
- }
-
- if (wait <= 0)
- {
- ProxyAssertSupport.fail("Timed out");
- }
-
- ProxyAssertSupport.assertFalse(failed);
- }
-
- private void finished(final MyListener listener)
- {
- synchronized (listeners)
- {
- log.info("consumer " + listener + " has finished");
-
- listeners.remove(listener);
-
- listeners.notify();
- }
- }
-
- private void failed(final MyListener listener)
- {
- synchronized (listeners)
- {
- log.error("consumer " + listener + " has failed");
-
- listeners.remove(listener);
-
- failed = true;
-
- listeners.notify();
- }
- }
-
- private final class MyListener implements MessageListener
- {
- public void onMessage(final Message msg)
- {
- try
- {
- int count = msg.getIntProperty("count");
-
- // log.info(this + " got message " + msg);
-
- if (count == ManyConnectionsStressTest.NUM_MESSAGES - 1)
- {
- finished(this);
- }
- }
- catch (JMSException e)
- {
- log.error("Failed to get int property", e);
-
- failed(this);
- }
- }
-
- }
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java
deleted file mode 100644
index 29d5f8f885..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.naming.InitialContext;
-
-import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
-import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
-import org.apache.activemq.utils.UUIDGenerator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * A OpenCloseStressTest.
- *
- * This stress test starts several publisher connections and several subscriber connections, then
- * sends and consumes messages while concurrently closing the sessions.
- *
- * This test will help catch race conditions that occurred with rapid open/closing of sessions when
- * messages are being sent/received
- *
- * E.g. http://jira.jboss.com/jira/browse/JBMESSAGING-982
- * @author Tim Fox
- */
-public class OpenCloseStressTest extends ActiveMQServerTestCase
-{
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- InitialContext ic;
-
- ConnectionFactory cf;
-
- Topic topic;
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- // ServerManagement.start("all");
-
- ic = getInitialContext();
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-
- destroyTopic("TestTopic");
- createTopic("TestTopic");
-
- topic = (Topic)ic.lookup("topic/TestTopic");
-
- log.debug("setup done");
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- destroyQueue("TestQueue");
- log.debug("tear down done");
- }
-
- @Test
- public void testOpenClose() throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
-
- Connection conn4 = null;
- Connection conn5 = null;
- Connection conn6 = null;
- Connection conn7 = null;
- Connection conn8 = null;
-
- try
- {
- Publisher[] publishers = new Publisher[3];
-
- final int MSGS_PER_PUBLISHER = 10000;
-
- conn1 = cf.createConnection();
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod1 = sess1.createProducer(topic);
- prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
- publishers[0] = new Publisher(sess1, prod1, MSGS_PER_PUBLISHER, 2);
-
- conn2 = cf.createConnection();
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod2 = sess2.createProducer(topic);
- prod2.setDeliveryMode(DeliveryMode.PERSISTENT);
- publishers[1] = new Publisher(sess2, prod2, MSGS_PER_PUBLISHER, 5);
-
- conn3 = cf.createConnection();
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod3 = sess3.createProducer(topic);
- prod3.setDeliveryMode(DeliveryMode.PERSISTENT);
- publishers[2] = new Publisher(sess3, prod3, MSGS_PER_PUBLISHER, 1);
-
- Subscriber[] subscribers = new Subscriber[5];
-
- conn4 = cf.createConnection();
- subscribers[0] = new Subscriber(conn4, 3 * MSGS_PER_PUBLISHER, 500, 1000 * 60 * 15, topic, false);
-
- conn5 = cf.createConnection();
- subscribers[1] = new Subscriber(conn5, 3 * MSGS_PER_PUBLISHER, 2000, 1000 * 60 * 15, topic, false);
-
- conn6 = cf.createConnection();
- subscribers[2] = new Subscriber(conn6, 3 * MSGS_PER_PUBLISHER, 700, 1000 * 60 * 15, topic, false);
-
- conn7 = cf.createConnection();
- subscribers[3] = new Subscriber(conn7, 3 * MSGS_PER_PUBLISHER, 1500, 1000 * 60 * 15, topic, true);
-
- conn8 = cf.createConnection();
- subscribers[4] = new Subscriber(conn8, 3 * MSGS_PER_PUBLISHER, 1200, 1000 * 60 * 15, topic, true);
-
- Thread[] threads = new Thread[8];
-
- // subscribers
- threads[0] = new Thread(subscribers[0]);
-
- threads[1] = new Thread(subscribers[1]);
-
- threads[2] = new Thread(subscribers[2]);
-
- threads[3] = new Thread(subscribers[3]);
-
- threads[4] = new Thread(subscribers[4]);
-
- // publishers
-
- threads[5] = new Thread(publishers[0]);
-
- threads[6] = new Thread(publishers[1]);
-
- threads[7] = new Thread(publishers[2]);
-
- for (int i = 0; i < subscribers.length; i++)
- {
- threads[i].start();
- }
-
- // Pause before creating producers otherwise subscribers to make sure they're all created
-
- Thread.sleep(5000);
-
- for (int i = subscribers.length; i < threads.length; i++)
- {
- threads[i].start();
- }
-
- for (Thread thread : threads)
- {
- thread.join();
- }
-
- for (Subscriber subscriber : subscribers)
- {
- if (subscriber.isDurable())
- {
- ProxyAssertSupport.assertEquals(3 * MSGS_PER_PUBLISHER, subscriber.getMessagesReceived());
- }
- else
- {
- // Note that for a non durable subscriber the number of messages received in total
- // will be somewhat less than the total number received since when recycling the session
- // there is a period of time after closing the previous session and starting the next one
- // when messages are being sent and won't be received (since there is no consumer)
- }
-
- ProxyAssertSupport.assertFalse(subscriber.isFailed());
- }
-
- for (Publisher publisher : publishers)
- {
- ProxyAssertSupport.assertFalse(publisher.isFailed());
- }
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
- if (conn2 != null)
- {
- conn2.close();
- }
- if (conn3 != null)
- {
- conn3.close();
- }
- if (conn4 != null)
- {
- conn4.close();
- }
- if (conn5 != null)
- {
- conn5.close();
- }
- if (conn6 != null)
- {
- conn6.close();
- }
- if (conn7 != null)
- {
- conn7.close();
- }
- if (conn8 != null)
- {
- conn8.close();
- }
- }
-
- }
-
- class Publisher implements Runnable
- {
- private final Session sess;
-
- private final int numMessages;
-
- private final int delay;
-
- private final MessageProducer prod;
-
- private boolean failed;
-
- boolean isFailed()
- {
- return failed;
- }
-
- Publisher(final Session sess, final MessageProducer prod, final int numMessages, final int delay)
- {
- this.sess = sess;
-
- this.prod = prod;
-
- this.numMessages = numMessages;
-
- this.delay = delay;
- }
-
- public void run()
- {
- try
- {
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage tm = sess.createTextMessage("message" + i);
-
- prod.send(tm);
-
- try
- {
- Thread.sleep(delay);
- }
- catch (Exception ignore)
- {
- }
- }
- }
- catch (JMSException e)
- {
- log.error("Failed to send message", e);
- failed = true;
- }
- }
-
- }
-
- class Subscriber implements Runnable
- {
- private Session sess;
-
- private MessageConsumer cons;
-
- private int msgsReceived;
-
- private final int numMessages;
-
- private final int delay;
-
- private final Connection conn;
-
- private boolean failed;
-
- private final long timeout;
-
- private final Destination dest;
-
- private final boolean durable;
-
- private String subname;
-
- boolean isFailed()
- {
- return failed;
- }
-
- boolean isDurable()
- {
- return durable;
- }
-
- synchronized void msgReceived()
- {
- msgsReceived++;
- }
-
- synchronized int getMessagesReceived()
- {
- return msgsReceived;
- }
-
- class Listener implements MessageListener
- {
-
- public void onMessage(final Message msg)
- {
- msgReceived();
- }
-
- }
-
- Subscriber(final Connection conn,
- final int numMessages,
- final int delay,
- final long timeout,
- final Destination dest,
- final boolean durable) throws Exception
- {
- this.conn = conn;
-
- this.numMessages = numMessages;
-
- this.delay = delay;
-
- this.timeout = timeout;
-
- this.dest = dest;
-
- this.durable = durable;
-
- if (durable)
- {
- conn.setClientID(UUIDGenerator.getInstance().generateStringUUID());
-
- subname = UUIDGenerator.getInstance().generateStringUUID();
- }
- }
-
- public void run()
- {
- try
- {
- long start = System.currentTimeMillis();
-
- while (System.currentTimeMillis() - start < timeout && msgsReceived < numMessages)
- {
- // recycle the session
-
- recycleSession();
-
- Thread.sleep(delay);
- }
-
- // Delete the durable sub
-
- if (durable)
- {
- recycleSession();
-
- cons.close();
-
- sess.unsubscribe(subname);
- }
- }
- catch (Exception e)
- {
- log.error("Failed in subscriber", e);
- failed = true;
- }
-
- }
-
- void recycleSession() throws Exception
- {
- conn.stop();
-
- if (sess != null)
- {
- sess.close();
- }
-
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- if (durable)
- {
- cons = sess.createDurableSubscriber((Topic)dest, subname);
- }
- else
- {
- cons = sess.createConsumer(dest);
- }
-
- cons.setMessageListener(new Listener());
-
- conn.start();
- }
-
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java
deleted file mode 100644
index 7878859818..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * A QueueStressTest.
- *
- * @author Tim Fox
- * @version $Revision: 2349 $
- */
-
-public class QueueStressTest extends JMSStressTestBase
-{
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- /*
- * Stress a queue with transational, non transactional and 2pc senders sending both persistent
- * and non persistent messages
- * Transactional senders go through a cycle of sending and rolling back
- *
- */
- @Test
- public void testQueueMultipleSenders() throws Exception
- {
- Connection conn1 = cf.createConnection();
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess4 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess5 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess6 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess7 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess8 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session sess9 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess10 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess11 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess12 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess13 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess14 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess15 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- Session sess16 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-
- XASession xaSess1 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess1);
- XASession xaSess2 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess2);
- XASession xaSess3 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess3);
- XASession xaSess4 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess4);
- XASession xaSess5 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess5);
- XASession xaSess6 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess6);
- XASession xaSess7 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess7);
- XASession xaSess8 = ((XAConnection) conn1).createXASession();
- tweakXASession(xaSess8);
-
- Session sess17 = xaSess1.getSession();
- Session sess18 = xaSess2.getSession();
- Session sess19 = xaSess3.getSession();
- Session sess20 = xaSess4.getSession();
- Session sess21 = xaSess5.getSession();
- Session sess22 = xaSess6.getSession();
- Session sess23 = xaSess7.getSession();
- Session sess24 = xaSess8.getSession();
-
- MessageProducer prod1 = sess1.createProducer(destinationQueue1);
- prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod2 = sess2.createProducer(destinationQueue1);
- prod2.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod3 = sess3.createProducer(destinationQueue1);
- prod3.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod4 = sess4.createProducer(destinationQueue1);
- prod4.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod5 = sess5.createProducer(destinationQueue1);
- prod5.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod6 = sess6.createProducer(destinationQueue1);
- prod6.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod7 = sess7.createProducer(destinationQueue1);
- prod7.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod8 = sess8.createProducer(destinationQueue1);
- prod8.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod9 = sess9.createProducer(destinationQueue1);
- prod9.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod10 = sess10.createProducer(destinationQueue1);
- prod10.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod11 = sess11.createProducer(destinationQueue1);
- prod11.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod12 = sess12.createProducer(destinationQueue1);
- prod12.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod13 = sess13.createProducer(destinationQueue1);
- prod13.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod14 = sess14.createProducer(destinationQueue1);
- prod14.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod15 = sess15.createProducer(destinationQueue1);
- prod15.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod16 = sess16.createProducer(destinationQueue1);
- prod16.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod17 = sess17.createProducer(destinationQueue1);
- prod17.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod18 = sess18.createProducer(destinationQueue1);
- prod18.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod19 = sess19.createProducer(destinationQueue1);
- prod19.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod20 = sess20.createProducer(destinationQueue1);
- prod20.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod21 = sess21.createProducer(destinationQueue1);
- prod21.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod22 = sess22.createProducer(destinationQueue1);
- prod22.setDeliveryMode(DeliveryMode.PERSISTENT);
- MessageProducer prod23 = sess23.createProducer(destinationQueue1);
- prod23.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageProducer prod24 = sess24.createProducer(destinationQueue1);
- prod24.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- Connection conn2 = cf.createConnection();
- conn2.start();
- Session sessReceive = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessReceive.createConsumer(destinationQueue1);
-
- Runner[] runners = new Runner[]{
- new Sender("prod1", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
- new Sender("prod2", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
- new Sender("prod3", sess3, prod3, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
- new Sender("prod4", sess4, prod4, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
- new Sender("prod5", sess5, prod5, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
- new Sender("prod6", sess6, prod6, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
- new Sender("prod7", sess7, prod7, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
- new Sender("prod8", sess8, prod8, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
- new TransactionalSender("prod9",
- sess9,
- prod9,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 1,
- 1),
- new TransactionalSender("prod10",
- sess10,
- prod10,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 1,
- 1),
- new TransactionalSender("prod11",
- sess11,
- prod11,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 10,
- 7),
- new TransactionalSender("prod12",
- sess12,
- prod12,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 10,
- 7),
- new TransactionalSender("prod13",
- sess13,
- prod13,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 50,
- 21),
- new TransactionalSender("prod14",
- sess14,
- prod14,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 50,
- 21),
- new TransactionalSender("prod15",
- sess15,
- prod15,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 100,
- 67),
- new TransactionalSender("prod16",
- sess16,
- prod16,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 100,
- 67),
- new Transactional2PCSender("prod17",
- xaSess1,
- prod17,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 1,
- 1),
- new Transactional2PCSender("prod18",
- xaSess2,
- prod18,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 1,
- 1),
- new Transactional2PCSender("prod19",
- xaSess3,
- prod19,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 10,
- 7),
- new Transactional2PCSender("prod20",
- xaSess4,
- prod20,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 10,
- 7),
- new Transactional2PCSender("prod21",
- xaSess5,
- prod21,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 50,
- 21),
- new Transactional2PCSender("prod22",
- xaSess6,
- prod22,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 50,
- 21),
- new Transactional2PCSender("prod23",
- xaSess7,
- prod23,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
- 100,
- 67),
- new Transactional2PCSender("prod24",
- xaSess8,
- prod24,
- JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- 100,
- 67),
- new Receiver(sessReceive,
- cons,
- 12 * JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + 12 * JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
- false)};
-
- runRunners(runners);
-
- conn1.close();
-
- conn2.close();
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java
deleted file mode 100644
index 1a4c6b4760..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ServerSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-
-import org.apache.activemq.jms.tests.JmsTestLogger;
-
-/**
- * Receives messages from a destination for stress testing
- * @author Tim Fox
- */
-public class Receiver extends Runner implements MessageListener
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- private static final long RECEIVE_TIMEOUT = 120000;
-
- protected MessageConsumer cons;
-
- protected int count;
-
- protected boolean isListener;
-
- protected Map counts = new HashMap();
-
- protected boolean isCC;
-
- protected Connection conn;
-
- protected ConnectionConsumer cc;
-
- private final Object lock1 = new Object();
-
- private final Object lock2 = new Object();
-
- private Message theMessage;
-
- private boolean finished;
-
- public Receiver(final Connection conn, final Session sess, final int numMessages, final Destination dest) throws Exception
- {
- super(sess, numMessages);
-
- isListener = true;
-
- isCC = true;
-
- sess.setMessageListener(this);
-
- cc = conn.createConnectionConsumer(dest, null, new MockServerSessionPool(sess), 10);
-
- }
-
- public Receiver(final Session sess, final MessageConsumer cons, final int numMessages, final boolean isListener) throws Exception
- {
- super(sess, numMessages);
- this.cons = cons;
- this.isListener = isListener;
- if (this.isListener)
- {
- cons.setMessageListener(this);
- }
- }
-
- private boolean done;
-
- public void onMessage(final Message m)
- {
- try
- {
- synchronized (lock1)
- {
- theMessage = m;
-
- lock1.notify();
- }
-
- // Wait for message to be processed
- synchronized (lock2)
- {
- while (!done && !finished)
- {
- lock2.wait();
- }
- done = false;
- }
-
- }
- catch (Exception e)
- {
- Receiver.log.error("Failed to put in channel", e);
- setFailed(true);
- }
- }
-
- protected void finished()
- {
- synchronized (lock2)
- {
- finished = true;
- lock2.notify();
- }
- }
-
- protected Message getMessage() throws Exception
- {
- Message m;
-
- if (isListener)
- {
- synchronized (lock1)
- {
- long start = System.currentTimeMillis();
- long waitTime = Receiver.RECEIVE_TIMEOUT;
- while (theMessage == null && waitTime >= 0)
- {
- lock1.wait(waitTime);
-
- waitTime = Receiver.RECEIVE_TIMEOUT - (System.currentTimeMillis() - start);
- }
- m = theMessage;
- theMessage = null;
- }
- }
- else
- {
- m = cons.receive(Receiver.RECEIVE_TIMEOUT);
- }
-
- return m;
- }
-
- protected void processingDone()
- {
- if (isListener)
- {
- synchronized (lock2)
- {
- done = true;
- lock2.notify();
- }
- }
- }
-
- @Override
- public void run()
- {
-
- // Small pause so as not to miss any messages in a topic
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- }
-
- try
- {
- String prodName = null;
- Integer msgCount = null;
-
- while (count < numMessages)
- {
- Message m = getMessage();
-
- if (m == null)
- {
- Receiver.log.error("Message is null");
- setFailed(true);
- processingDone();
- return;
- }
-
- prodName = m.getStringProperty("PROD_NAME");
- msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- // log.info(this + " Got message " + prodName + ":" + msgCount + "M: " + m.getJMSMessageID());
-
- Integer prevCount = (Integer)counts.get(prodName);
- if (prevCount == null)
- {
- if (msgCount.intValue() != 0)
- {
- Receiver.log.error("First message received not zero");
- setFailed(true);
- processingDone();
- return;
- }
- }
- else
- {
- if (prevCount.intValue() != msgCount.intValue() - 1)
- {
- Receiver.log.error("Message out of sequence for " + prodName +
- ", expected:" +
- (prevCount.intValue() + 1) +
- " got " +
- msgCount);
- setFailed(true);
- processingDone();
- return;
- }
- }
- counts.put(prodName, msgCount);
-
- count++;
-
- processingDone();
- }
-
- }
- catch (Exception e)
- {
- Receiver.log.error("Failed to receive message", e);
- setFailed(true);
- }
- finally
- {
- if (cc != null)
- {
- try
- {
- cc.close();
- }
- catch (JMSException e)
- {
- Receiver.log.error("Failed to close connection consumer", e);
- }
- }
- }
- }
-
- static final class MockServerSessionPool implements ServerSessionPool
- {
- private final ServerSession serverSession;
-
- MockServerSessionPool(final Session sess)
- {
- serverSession = new MockServerSession(sess);
- }
-
- public ServerSession getServerSession() throws JMSException
- {
- return serverSession;
- }
- }
-
- static final class MockServerSession implements ServerSession
- {
- Session session;
-
- MockServerSession(final Session sess)
- {
- session = sess;
- }
-
- public Session getSession() throws JMSException
- {
- return session;
- }
-
- public void start() throws JMSException
- {
- session.run();
- }
-
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java
deleted file mode 100644
index eb2e4ea050..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import org.apache.activemq.jms.tests.JmsTestLogger;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-/**
- *
- * A RecoveringReceiver.
- *
- * A Receiver that receives messages from a destination and alternately
- * acknowledges and recovers the session.
- * Must be used with ack mode CLIENT_ACKNOWLEDGE
- *
- *
- * @author Tim Fox
- *
- */
-public class RecoveringReceiver extends Receiver
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected int ackSize;
-
- protected int recoverSize;
-
- class Count
- {
- int lastAcked;
-
- int lastReceived;
- }
-
- public RecoveringReceiver(final Session sess,
- final MessageConsumer cons,
- final int numMessages,
- final int ackSize,
- final int recoverSize,
- final boolean isListener) throws Exception
- {
- super(sess, cons, numMessages, isListener);
- this.ackSize = ackSize;
- this.recoverSize = recoverSize;
- }
-
- @Override
- public void run()
- {
- // Small pause so as not to miss any messages in a topic
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- }
-
- try
- {
- int iterations = numMessages / ackSize;
-
- for (int outerCount = 0; outerCount < iterations; outerCount++)
- {
- Message m = null;
- for (int innerCount = 0; innerCount < ackSize; innerCount++)
- {
- m = getMessage();
-
- if (m == null)
- {
- RecoveringReceiver.log.error("Message is null");
- setFailed(true);
- return;
- }
- String prodName = m.getStringProperty("PROD_NAME");
- Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- Count count = (Count)counts.get(prodName);
- if (count == null)
- {
- // First time
- if (msgCount.intValue() != 0)
- {
- RecoveringReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
- setFailed(true);
- return;
- }
- else
- {
- count = new Count();
- counts.put(prodName, count);
- }
- }
- else
- {
- if (count.lastAcked != msgCount.intValue() - 1)
- {
- RecoveringReceiver.log.error("Message out of sequence for " + prodName +
- ", expected " +
- (count.lastAcked + 1));
- setFailed(true);
- return;
- }
- }
- count.lastAcked = msgCount.intValue();
-
- count.lastReceived = msgCount.intValue();
-
- if (innerCount == ackSize - 1)
- {
- m.acknowledge();
- }
- processingDone();
-
- }
-
- if (outerCount == iterations - 1)
- {
- break;
- }
-
- for (int innerCount = 0; innerCount < recoverSize; innerCount++)
- {
- m = getMessage();
-
- if (m == null)
- {
- RecoveringReceiver.log.error("Message is null");
- return;
- }
- String prodName = m.getStringProperty("PROD_NAME");
- Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- Count count = (Count)counts.get(prodName);
- if (count == null)
- {
- // First time
- if (msgCount.intValue() != 0)
- {
- RecoveringReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
- setFailed(true);
- return;
- }
- else
- {
- count = new Count();
- count.lastAcked = -1;
- counts.put(prodName, count);
- }
- }
- else
- {
- if (count.lastReceived != msgCount.intValue() - 1)
- {
- RecoveringReceiver.log.error("Message out of sequence");
- setFailed(true);
- return;
- }
- }
- count.lastReceived = msgCount.intValue();
-
- if (innerCount == recoverSize - 1)
- {
- sess.recover();
- }
- processingDone();
- }
- }
- }
- catch (Exception e)
- {
- RecoveringReceiver.log.error("Failed to receive message", e);
- setFailed(true);
- }
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java
deleted file mode 100644
index 04cd83030c..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.InitialContext;
-
-import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
-import org.apache.activemq.jms.tests.JmsTestLogger;
-import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Send messages to a topic with selector1, consumer them with multiple consumers and relay them
- * back to the topic with a different selector, then consume that with more consumers.
- *
- * @author Tim Fox
- *
- *
- */
-public class RelayStressTest extends ActiveMQServerTestCase
-{
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- // Constants -----------------------------------------------------
-
- private static JmsTestLogger log = JmsTestLogger.LOGGER;
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private InitialContext ic;
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- // ServerManagement.start("all");
- ic = getInitialContext();
- createTopic("StressTestTopic");
-
- RelayStressTest.log.debug("setup done");
- }
-
- @Override
- @After
- public void tearDown() throws Exception
- {
- destroyTopic("StressTestTopic");
- ic.close();
- }
-
- @Test
- public void testRelay() throws Exception
- {
- ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-
- Topic topic = (Topic)ic.lookup("/topic/StressTestTopic");
-
- final int numMessages = 20000;
-
- final int numRelayers = 5;
-
- final int numConsumers = 20;
-
- Connection conn = cf.createConnection();
-
- class Relayer implements MessageListener
- {
- boolean done;
-
- boolean failed;
-
- int count;
-
- MessageProducer prod;
-
- Relayer(final MessageProducer prod)
- {
- this.prod = prod;
- }
-
- public void onMessage(final Message m)
- {
- try
- {
- m.clearProperties();
- m.setStringProperty("name", "Tim");
-
- prod.send(m);
-
- count++;
-
- if (count == numMessages)
- {
- synchronized (this)
- {
- done = true;
- notify();
- }
- }
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- synchronized (this)
- {
- done = true;
- failed = true;
- notify();
- }
- }
- }
- }
-
- class Consumer implements MessageListener
- {
- boolean failed;
-
- boolean done;
-
- int count;
-
- public void onMessage(final Message m)
- {
- count++;
-
- if (count == numMessages * numRelayers)
- {
- synchronized (this)
- {
- done = true;
- notify();
- }
- }
- }
- }
-
- Relayer[] relayers = new Relayer[numRelayers];
-
- Consumer[] consumers = new Consumer[numConsumers];
-
- for (int i = 0; i < numRelayers; i++)
- {
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(topic, "name = 'Watt'");
- // MessageConsumer cons = sess.createConsumer(topic);
-
- MessageProducer prod = sess.createProducer(topic);
-
- relayers[i] = new Relayer(prod);
-
- cons.setMessageListener(relayers[i]);
- }
-
- for (int i = 0; i < numConsumers; i++)
- {
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(topic, "name = 'Tim'");
-
- consumers[i] = new Consumer();
-
- cons.setMessageListener(consumers[i]);
- }
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(topic);
-
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < numMessages; i++)
- {
- Message m = sess.createMessage();
-
- m.setStringProperty("name", "Watt");
-
- prod.send(m);
- }
-
- for (int i = 0; i < numRelayers; i++)
- {
- synchronized (relayers[i])
- {
- if (!relayers[i].done)
- {
- relayers[i].wait();
- }
- }
- }
-
- for (int i = 0; i < numConsumers; i++)
- {
- synchronized (consumers[i])
- {
- if (!consumers[i].done)
- {
- consumers[i].wait();
- }
- }
- }
-
- conn.close();
-
- for (int i = 0; i < numRelayers; i++)
- {
- ProxyAssertSupport.assertFalse(relayers[i].failed);
- }
-
- for (int i = 0; i < numConsumers; i++)
- {
- ProxyAssertSupport.assertFalse(consumers[i].failed);
- }
-
- }
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java
deleted file mode 100644
index d5aab5e900..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import org.apache.activemq.jms.tests.JmsTestLogger;
-
-import javax.jms.Session;
-
-/**
- *
- * A Runner.
- *
- * Base class for running components of a stress test
- *
- * @author Tim Fox
- *
- */
-public abstract class Runner implements Runnable
-{
- protected JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected Session sess;
-
- protected int numMessages;
-
- private boolean failed;
-
- public Runner(final Session sess, final int numMessages)
- {
- this.sess = sess;
- this.numMessages = numMessages;
- }
-
- public abstract void run();
-
- public boolean isFailed()
- {
- return failed;
- }
-
- public void setFailed(final boolean failed)
- {
- this.failed = failed;
- if (failed)
- {
- log.info("Marking Runner " + this + " as failed", new Exception("trace"));
- }
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java
deleted file mode 100644
index f57a375ce0..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import org.apache.activemq.jms.tests.JmsTestLogger;
-
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-/**
- *
- * A Sender.
- *
- * Sends messages to a destination, used in stress testing
- *
- * @author Tim Fox
- *
- */
-public class Sender extends Runner
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected MessageProducer prod;
-
- protected String prodName;
-
- protected int count;
-
- public Sender(final String prodName, final Session sess, final MessageProducer prod, final int numMessages)
- {
- super(sess, numMessages);
- this.prod = prod;
- this.prodName = prodName;
- }
-
- @Override
- public void run()
- {
- try
- {
- while (count < numMessages)
- {
- Message m = sess.createMessage();
- m.setStringProperty("PROD_NAME", prodName);
- m.setIntProperty("MSG_NUMBER", count);
- prod.send(m);
- count++;
- }
- }
- catch (Exception e)
- {
- Sender.log.error("Failed to send message", e);
- setFailed(true);
- }
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java
deleted file mode 100644
index 72b33b3ddd..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.Context;
-
-import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
-import org.apache.activemq.jms.tests.JmsTestLogger;
-import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * In order for this test to run, you will need to edit /etc/security/limits.conf and change your max sockets to something bigger than 1024
- *
- * It's required to re-login after this change.
- *
- * For Windows you need also to increase this limit (max opened files) somehow.
- *
- *
-Example of /etc/security/limits.confg:
-# -
-clebert hard nofile 10240
-
-
- * @author Clebert Suconic
- */
-public class SeveralClientsStressTest extends ActiveMQServerTestCase
-{
- @BeforeClass
- public static void stressTestsEnabled()
- {
- org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
- }
-
- protected boolean info = false;
-
- protected boolean startServer = true;
-
- // Static ---------------------------------------------------------------------------------------
-
- protected static long PRODUCER_ALIVE_FOR = 60000; // one minute
-
- protected static long CONSUMER_ALIVE_FOR = 60000; // one minutes
-
- protected static long TEST_ALIVE_FOR = 5 * 60 * 1000; // 5 minutes
-
- protected static int NUMBER_OF_PRODUCERS = 100; // this should be set to 300 later
-
- protected static int NUMBER_OF_CONSUMERS = 100; // this should be set to 300 later
-
- // a producer should have a long wait between each message sent?
- protected static boolean LONG_WAIT_ON_PRODUCERS = false;
-
- protected static AtomicInteger producedMessages = new AtomicInteger(0);
-
- protected static AtomicInteger readMessages = new AtomicInteger(0);
-
- protected Context createContext() throws Exception
- {
- return getInitialContext();
- }
-
- // Constructors ---------------------------------------------------------------------------------
-
- // Public ---------------------------------------------------------------------------------------
-
- @Test
- public void testQueue() throws Exception
- {
- Context ctx = createContext();
-
- HashSet threads = new HashSet();
-
- // A chhanel of communication between workers and the test method
- LinkedBlockingQueue testChannel = new LinkedBlockingQueue();
-
- for (int i = 0; i < SeveralClientsStressTest.NUMBER_OF_PRODUCERS; i++)
- {
- threads.add(new SeveralClientsStressTest.Producer(i, testChannel));
- }
-
- for (int i = 0; i < SeveralClientsStressTest.NUMBER_OF_CONSUMERS; i++)
- {
- threads.add(new SeveralClientsStressTest.Consumer(i, testChannel));
- }
-
- for (Worker worker : threads)
- {
- worker.start();
- }
-
- long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.TEST_ALIVE_FOR;
-
- int numberOfProducers = SeveralClientsStressTest.NUMBER_OF_PRODUCERS;
- int numberOfConsumers = SeveralClientsStressTest.NUMBER_OF_CONSUMERS;
-
- while (threads.size() > 0)
- {
- SeveralClientsStressTest.InternalMessage msg = testChannel.poll(2000,
- TimeUnit.MILLISECONDS);
-
- log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() +
- " and Consumed:" +
- SeveralClientsStressTest.readMessages.get() +
- " messages");
-
- if (msg != null)
- {
- if (info)
- {
- log.info("Received message " + msg);
- }
- if (msg instanceof SeveralClientsStressTest.WorkerFailed)
- {
- ProxyAssertSupport.fail("Worker " + msg.getWorker() + " has failed");
- }
- else if (msg instanceof SeveralClientsStressTest.WorkedFinishedMessages)
- {
- SeveralClientsStressTest.WorkedFinishedMessages finished = (SeveralClientsStressTest.WorkedFinishedMessages)msg;
- if (threads.remove(finished.getWorker()))
- {
- if (System.currentTimeMillis() < timeToFinish)
- {
- if (finished.getWorker() instanceof SeveralClientsStressTest.Producer)
- {
- if (info)
- {
- log.info("Scheduling new Producer " + numberOfProducers);
- }
- SeveralClientsStressTest.Producer producer = new SeveralClientsStressTest.Producer(numberOfProducers++,
- testChannel);
- threads.add(producer);
- producer.start();
- }
- else if (finished.getWorker() instanceof SeveralClientsStressTest.Consumer)
- {
- if (info)
- {
- log.info("Scheduling new ClientConsumer " + numberOfConsumers);
- }
- SeveralClientsStressTest.Consumer consumer = new SeveralClientsStressTest.Consumer(numberOfConsumers++,
- testChannel);
- threads.add(consumer);
- consumer.start();
- }
- }
- }
- else
- {
- log.warn(finished.getWorker() + " was not available on threads HashSet");
- }
- }
- }
- }
-
- log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() +
- " and Consumed:" +
- SeveralClientsStressTest.readMessages.get() +
- " messages");
-
- clearMessages();
-
- log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() +
- " and Consumed:" +
- SeveralClientsStressTest.readMessages.get() +
- " messages");
-
- ProxyAssertSupport.assertEquals(SeveralClientsStressTest.producedMessages.get(),
- SeveralClientsStressTest.readMessages.get());
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- protected void clearMessages() throws Exception
- {
- Context ctx = createContext();
- ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory");
- Connection conn = cf.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = (Queue)ctx.lookup("queue/testQueue");
- MessageConsumer consumer = sess.createConsumer(queue);
-
- conn.start();
-
- while (consumer.receive(1000) != null)
- {
- SeveralClientsStressTest.readMessages.incrementAndGet();
- log.info("Received JMS message on clearMessages");
- }
-
- conn.close();
- }
-
- @Override
- @Before
- public void setUp() throws Exception
- {
- super.setUp();
-
- if (startServer)
- {
- // ServerManagement.start("all", true);
- createQueue("testQueue");
- }
-
- clearMessages();
- SeveralClientsStressTest.producedMessages = new AtomicInteger(0);
- SeveralClientsStressTest.readMessages = new AtomicInteger(0);
- }
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
- private class Worker extends Thread
- {
-
- protected JmsTestLogger log = JmsTestLogger.LOGGER;
-
- private boolean failed = false;
-
- private final int workerId;
-
- private Exception ex;
-
- LinkedBlockingQueue messageQueue;
-
- public int getWorkerId()
- {
- return workerId;
- }
-
- public Exception getException()
- {
- return ex;
- }
-
- public boolean isFailed()
- {
- return failed;
- }
-
- protected synchronized void setFailed(final boolean failed, final Exception ex)
- {
- this.failed = failed;
- this.ex = ex;
-
- log.info("Sending Exception", ex);
-
- sendInternalMessage(new SeveralClientsStressTest.WorkerFailed(this));
-
- }
-
- protected void sendInternalMessage(final SeveralClientsStressTest.InternalMessage msg)
- {
- if (info)
- {
- log.info("Sending message " + msg);
- }
- try
- {
- messageQueue.put(msg);
- }
- catch (Exception e)
- {
- log.error(e, e);
- setFailed(true, e);
- }
- }
-
- public Worker(final String name, final int workerId,
- final LinkedBlockingQueue messageQueue)
- {
- super(name);
- this.workerId = workerId;
- this.messageQueue = messageQueue;
- setDaemon(true);
- }
-
- @Override
- public String toString()
- {
- return this.getClass().getName() + ":" + getWorkerId();
- }
- }
-
- final class Producer extends SeveralClientsStressTest.Worker
- {
- public Producer(final int producerId,
- final LinkedBlockingQueue messageQueue)
- {
- super("Producer:" + producerId, producerId, messageQueue);
- }
-
- Random random = new Random();
-
- @Override
- public void run()
- {
- try
- {
- Context ctx = createContext();
-
- ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory");
-
- Queue queue = (Queue)ctx.lookup("queue/testQueue");
-
- if (info)
- {
- log.info("Creating connection and producer");
- }
- Connection conn = cf.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sess.createProducer(queue);
-
- if (getWorkerId() % 2 == 0)
- {
- if (info)
- {
- log.info("Non Persistent Producer was created");
- }
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
- else
- {
- if (info)
- {
- log.info("Persistent Producer was created");
- }
- prod.setDeliveryMode(DeliveryMode.PERSISTENT);
- }
-
- long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.PRODUCER_ALIVE_FOR;
-
- try
- {
- int messageSent = 0;
- while (System.currentTimeMillis() < timeToFinish)
- {
- prod.send(sess.createTextMessage("Message sent at " + System.currentTimeMillis()));
- SeveralClientsStressTest.producedMessages.incrementAndGet();
- messageSent++;
- if (messageSent % 50 == 0)
- {
- if (info)
- {
- log.info("Sent " + messageSent + " Messages");
- }
- }
-
- if (SeveralClientsStressTest.LONG_WAIT_ON_PRODUCERS)
- {
- int waitTime = random.nextInt() % 2 + 1;
- if (waitTime < 0)
- {
- waitTime *= -1;
- }
- Thread.sleep(waitTime * 1000); // wait 1 or 2 seconds
- }
- else
- {
- Thread.sleep(100);
- }
- }
- sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
- }
- finally
- {
- conn.close();
- }
-
- }
- catch (Exception e)
- {
- log.error(e, e);
- setFailed(true, e);
- }
- }
- }
-
- final class Consumer extends SeveralClientsStressTest.Worker
- {
- public Consumer(final int consumerId,
- final LinkedBlockingQueue messageQueue)
- {
- super("ClientConsumer:" + consumerId, consumerId, messageQueue);
- }
-
- @Override
- public void run()
- {
- try
- {
- Context ctx = createContext();
-
- ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory");
-
- Queue queue = (Queue)ctx.lookup("queue/testQueue");
-
- if (info)
- {
- log.info("Creating connection and consumer");
- }
- Connection conn = cf.createConnection();
- Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = sess.createConsumer(queue);
- if (info)
- {
- log.info("ClientConsumer was created");
- }
-
- conn.start();
-
- int msgs = 0;
-
- int transactions = 0;
-
- long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.CONSUMER_ALIVE_FOR;
-
- try
- {
- while (System.currentTimeMillis() < timeToFinish)
- {
- Message msg = consumer.receive(1000);
- if (msg != null)
- {
- msgs++;
- if (msgs >= 50)
- {
- transactions++;
- if (transactions % 2 == 0)
- {
- if (info)
- {
- log.info("Commit transaction");
- }
- sess.commit();
- SeveralClientsStressTest.readMessages.addAndGet(msgs);
- }
- else
- {
- if (info)
- {
- log.info("Rollback transaction");
- }
- sess.rollback();
- }
- msgs = 0;
- }
- }
- else
- {
- break;
- }
- }
-
- SeveralClientsStressTest.readMessages.addAndGet(msgs);
- sess.commit();
-
- sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
- }
- finally
- {
- conn.close();
- }
-
- }
- catch (Exception e)
- {
- log.error(e);
- setFailed(true, e);
- }
- }
- }
-
- // Objects used on the communication between Workers and the test
- static class InternalMessage
- {
- SeveralClientsStressTest.Worker worker;
-
- public InternalMessage(final SeveralClientsStressTest.Worker worker)
- {
- this.worker = worker;
- }
-
- public SeveralClientsStressTest.Worker getWorker()
- {
- return worker;
- }
-
- @Override
- public String toString()
- {
- return this.getClass().getName() + " worker-> " + worker.toString();
- }
- }
-
- static class WorkedFinishedMessages extends SeveralClientsStressTest.InternalMessage
- {
-
- public WorkedFinishedMessages(final SeveralClientsStressTest.Worker worker)
- {
- super(worker);
- }
-
- }
-
- static class WorkerFailed extends SeveralClientsStressTest.InternalMessage
- {
- public WorkerFailed(final SeveralClientsStressTest.Worker worker)
- {
- super(worker);
- }
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TopicStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TopicStressTest.java
deleted file mode 100644
index f81099e12f..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TopicStressTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
-
-import org.junit.Test;
-
-/**
- * A TopicStressTest.
- *
- * @author Tim Fox
- * @version $Revision: 2349 $
- */
-public class TopicStressTest extends JMSStressTestBase
-{
- /*
- * Stress a topic with with many non transactional, transactional and 2pc receivers.
- * Non transactional receivers use ack modes of auto, dups and client ack.
- * Client ack receivers go through a cycle of receiving a batch, acking and recovering
- * Transactional receivers go through a cycle of receiving committing and rolling back.
- * Half the consumers are durable and half non durable.
- *
- */
- @Test
- public void testTopicMultipleReceivers() throws Exception
- {
- Connection conn1 = cf.createConnection();
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod1 = sess1.createProducer(topic1);
- prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- MessageProducer prod2 = sess2.createProducer(topic1);
- prod2.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- Connection conn2 = cf.createConnection();
- conn2.setClientID("clientid1");
- conn2.start();
-
- // 4 auto ack
- Session rsess1 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session rsess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session rsess3 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session rsess4 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // 4 dups
- Session rsess5 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- Session rsess6 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- Session rsess7 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- Session rsess8 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- // 4 client
- Session rsess9 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Session rsess10 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Session rsess11 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Session rsess12 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // 4 transactional
- Session rsess13 = conn2.createSession(true, Session.SESSION_TRANSACTED);
- Session rsess14 = conn2.createSession(true, Session.SESSION_TRANSACTED);
- Session rsess15 = conn2.createSession(true, Session.SESSION_TRANSACTED);
- Session rsess16 = conn2.createSession(true, Session.SESSION_TRANSACTED);
-
- // 4 2pc transactional
- XASession rxaSess1 = ((XAConnection) conn2).createXASession();
- tweakXASession(rxaSess1);
- XASession rxaSess2 = ((XAConnection) conn2).createXASession();
- tweakXASession(rxaSess2);
- XASession rxaSess3 = ((XAConnection) conn2).createXASession();
- tweakXASession(rxaSess3);
- XASession rxaSess4 = ((XAConnection) conn2).createXASession();
- tweakXASession(rxaSess4);
-
- Session rsess17 = rxaSess1.getSession();
- Session rsess18 = rxaSess2.getSession();
- Session rsess19 = rxaSess3.getSession();
- Session rsess20 = rxaSess4.getSession();
-
- MessageConsumer cons1 = rsess1.createConsumer(topic1);
- MessageConsumer cons2 = rsess2.createDurableSubscriber(topic1, "sub1");
- MessageConsumer cons3 = rsess3.createConsumer(topic1);
- MessageConsumer cons4 = rsess4.createDurableSubscriber(topic1, "sub2");
- MessageConsumer cons5 = rsess5.createConsumer(topic1);
- MessageConsumer cons6 = rsess6.createDurableSubscriber(topic1, "sub3");
- MessageConsumer cons7 = rsess7.createConsumer(topic1);
- MessageConsumer cons8 = rsess8.createDurableSubscriber(topic1, "sub4");
- MessageConsumer cons9 = rsess9.createConsumer(topic1);
- MessageConsumer cons10 = rsess10.createDurableSubscriber(topic1, "sub5");
- MessageConsumer cons11 = rsess11.createConsumer(topic1);
- MessageConsumer cons12 = rsess12.createDurableSubscriber(topic1, "sub6");
- MessageConsumer cons13 = rsess13.createConsumer(topic1);
- MessageConsumer cons14 = rsess14.createDurableSubscriber(topic1, "sub7");
- MessageConsumer cons15 = rsess15.createConsumer(topic1);
- MessageConsumer cons16 = rsess16.createDurableSubscriber(topic1, "sub8");
- MessageConsumer cons17 = rsess17.createConsumer(topic1);
- MessageConsumer cons18 = rsess18.createDurableSubscriber(topic1, "sub9");
- MessageConsumer cons19 = rsess19.createConsumer(topic1);
- MessageConsumer cons20 = rsess20.createDurableSubscriber(topic1, "sub10");
-
- // To make sure paging occurs first send some messages before receiving
-
- Runner[] runners = new Runner[]{
- new Sender("prod1", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND),
- new Sender("prod2", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_PRESEND)};
-
- runRunners(runners);
-
- runners = new Runner[]{
- // 4 auto ack
- new Receiver(rsess1,
- cons1,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- false),
- new Receiver(rsess2,
- cons2,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- true),
- new Receiver(rsess3,
- cons3,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- false),
- new Receiver(rsess4,
- cons4,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- true),
-
- // 4 dups ok
- new Receiver(rsess5,
- cons5,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- false),
- new Receiver(rsess6,
- cons6,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- true),
- new Receiver(rsess7,
- cons7,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- false),
- new Receiver(rsess8,
- cons8,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- true),
-
- // 4 client ack
- new RecoveringReceiver(rsess9,
- cons9,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 1,
- 1,
- false),
- new RecoveringReceiver(rsess10,
- cons10,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 10,
- 7,
- true),
- new RecoveringReceiver(rsess11,
- cons11,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 50,
- 21,
- false),
- new RecoveringReceiver(rsess12,
- cons12,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 100,
- 67,
- true),
-
- // 4 transactional
-
- new TransactionalReceiver(rsess13,
- cons13,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 1,
- 1,
- false),
- new TransactionalReceiver(rsess14,
- cons14,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 10,
- 7,
- true),
- new TransactionalReceiver(rsess15,
- cons15,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 50,
- 21,
- false),
- new TransactionalReceiver(rsess16,
- cons16,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 100,
- 67,
- true),
-
- // 4 2pc transactional
- new Transactional2PCReceiver(rxaSess1,
- cons17,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 1,
- 1,
- false),
- new Transactional2PCReceiver(rxaSess2,
- cons18,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 10,
- 7,
- true),
- new Transactional2PCReceiver(rxaSess3,
- cons19,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 50,
- 21,
- false),
- new Transactional2PCReceiver(rxaSess4,
- cons20,
- JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
- JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
- JMSStressTestBase.NUM_PERSISTENT_PRESEND,
- 100,
- 67,
- true),
-
- new Sender("prod3", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
- new Sender("prod4", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_MESSAGES)};
-
- runRunners(runners);
-
- conn1.close();
-
- conn2.close();
- }
-
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Transactional2PCReceiver.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Transactional2PCReceiver.java
deleted file mode 100644
index 31e611cd50..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Transactional2PCReceiver.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-
-import org.apache.activemq.core.transaction.impl.XidImpl;
-import org.apache.activemq.jms.tests.JmsTestLogger;
-import org.apache.activemq.utils.UUIDGenerator;
-
-/**
- *
- * A receiver that receives messages in a XA transaction
- *
- * Receives messages then prepares, commits, then
- * Receives messages then prepares, rollsback until
- * a total of messages have been received (committed)
- * must be a multiple of
- *
- * @author Tim Fox
- *
- */
-public class Transactional2PCReceiver extends Receiver
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected int commitSize;
-
- protected int rollbackSize;
-
- protected XAResource xaResource;
-
- class Count
- {
- int lastCommitted;
-
- int lastReceived;
- }
-
- public Transactional2PCReceiver(final XASession sess,
- final MessageConsumer cons,
- final int numMessages,
- final int commitSize,
- final int rollbackSize,
- final boolean isListener) throws Exception
- {
- super(sess, cons, numMessages, isListener);
- this.commitSize = commitSize;
- this.rollbackSize = rollbackSize;
- xaResource = sess.getXAResource();
- }
-
- @Override
- public void run()
- {
- // Small pause so as not to miss any messages in a topic
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- }
-
- try
- {
- int iterations = numMessages / commitSize;
-
- XidImpl xid = null;
-
- xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
- xaResource.start(xid, XAResource.TMNOFLAGS);
-
- for (int outerCount = 0; outerCount < iterations; outerCount++)
- {
-
- for (int innerCount = 0; innerCount < commitSize; innerCount++)
- {
- Message m = getMessage();
-
- if (m == null)
- {
- Transactional2PCReceiver.log.error("Message is null");
- setFailed(true);
- return;
- }
- String prodName = m.getStringProperty("PROD_NAME");
- Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- Count count = (Count)counts.get(prodName);
- if (count == null)
- {
- // First time
- if (msgCount.intValue() != 0)
- {
- Transactional2PCReceiver.log.error("First message from " + prodName +
- " is not 0, it is " +
- msgCount);
- setFailed(true);
- return;
- }
- else
- {
- count = new Count();
- counts.put(prodName, count);
- }
- }
- else
- {
- if (count.lastCommitted != msgCount.intValue() - 1)
- {
- Transactional2PCReceiver.log.error("Message out of sequence for " + prodName +
- ", expected " +
- (count.lastCommitted + 1) +
- ", actual " +
- msgCount);
- setFailed(true);
- return;
- }
- }
- count.lastCommitted = msgCount.intValue();
-
- count.lastReceived = msgCount.intValue();
-
- if (innerCount == commitSize - 1)
- {
- xaResource.end(xid, XAResource.TMSUCCESS);
- xaResource.prepare(xid);
- xaResource.commit(xid, false);
-
- // Starting new tx
- xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
- xaResource.start(xid, XAResource.TMNOFLAGS);
-
- }
-
- processingDone();
- }
-
- if (outerCount == iterations - 1)
- {
- break;
- }
-
- for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
- {
- Message m = getMessage();
-
- if (m == null)
- {
- Transactional2PCReceiver.log.error("Message is null (rollback)");
- setFailed(true);
- return;
- }
- String prodName = m.getStringProperty("PROD_NAME");
- Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- Count count = (Count)counts.get(prodName);
- if (count == null)
- {
- // First time
- if (msgCount.intValue() != 0)
- {
- Transactional2PCReceiver.log.error("First message from " + prodName +
- " is not 0, it is " +
- msgCount);
- setFailed(true);
- return;
- }
- else
- {
- count = new Count();
- count.lastCommitted = -1;
- counts.put(prodName, count);
- }
- }
- else
- {
- if (count.lastReceived != msgCount.intValue() - 1)
- {
- Transactional2PCReceiver.log.error("Message out of sequence");
- setFailed(true);
- return;
- }
- }
- count.lastReceived = msgCount.intValue();
-
- if (innerCount == rollbackSize - 1)
- {
- xaResource.end(xid, XAResource.TMSUCCESS);
- xaResource.prepare(xid);
- xaResource.rollback(xid);
-
- xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
- xaResource.start(xid, XAResource.TMNOFLAGS);
- }
- processingDone();
- }
- }
-
- xaResource.end(xid, XAResource.TMSUCCESS);
- xaResource.prepare(xid);
- xaResource.commit(xid, false);
-
- finished();
-
- }
- catch (Exception e)
- {
- Transactional2PCReceiver.log.error("Failed to receive message", e);
- setFailed(true);
- }
- }
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Transactional2PCSender.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Transactional2PCSender.java
deleted file mode 100644
index e1e1880c7b..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Transactional2PCSender.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-
-import org.apache.activemq.core.transaction.impl.XidImpl;
-import org.apache.activemq.jms.tests.JmsTestLogger;
-import org.apache.activemq.utils.UUIDGenerator;
-
-/**
- *
- * A Sender that sends messages to a destination in an XA transaction
- *
- * Sends messages to a destination in a jms transaction.
- * Sends messages then prepares, commits, then
- * sends messages then prepares, rollsback until
- * a total of messages have been sent (commitSize)
- * must be a multiple of
- *
- * @author Tim Fox
- *
- */
-public class Transactional2PCSender extends Sender
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected int commitSize;
-
- protected int rollbackSize;
-
- protected XAResource xaResource;
-
- public Transactional2PCSender(final String prodName,
- final XASession sess,
- final MessageProducer prod,
- final int numMessages,
- final int commitSize,
- final int rollbackSize)
- {
- super(prodName, sess, prod, numMessages);
-
- this.commitSize = commitSize;
- this.rollbackSize = rollbackSize;
- xaResource = sess.getXAResource();
- }
-
- @Override
- public void run()
- {
-
- int iterations = numMessages / commitSize;
-
- try
- {
- for (int outerCount = 0; outerCount < iterations; outerCount++)
- {
- XidImpl xid = null;
- if (commitSize > 0)
- {
- xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
- xaResource.start(xid, XAResource.TMNOFLAGS);
- }
- for (int innerCount = 0; innerCount < commitSize; innerCount++)
- {
- Message m = sess.createMessage();
- m.setStringProperty("PROD_NAME", prodName);
- m.setIntProperty("MSG_NUMBER", outerCount * commitSize + innerCount);
- prod.send(m);
- }
- if (commitSize > 0)
- {
- xaResource.end(xid, XAResource.TMSUCCESS);
- xaResource.prepare(xid);
- xaResource.commit(xid, false);
- }
- if (rollbackSize > 0)
- {
- xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
- xaResource.start(xid, XAResource.TMNOFLAGS);
- }
- for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
- {
- Message m = sess.createMessage();
- m.setStringProperty("PROD_NAME", prodName);
- m.setIntProperty("MSG_NUMBER", (outerCount + 1) * commitSize + innerCount);
- prod.send(m);
- }
- if (rollbackSize > 0)
- {
- xaResource.end(xid, XAResource.TMSUCCESS);
- xaResource.prepare(xid);
- xaResource.rollback(xid);
- }
- }
- }
- catch (Exception e)
- {
- Transactional2PCSender.log.error("Failed to send message", e);
- setFailed(true);
- }
- }
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TransactionalReceiver.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TransactionalReceiver.java
deleted file mode 100644
index 90617c5332..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TransactionalReceiver.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import org.apache.activemq.jms.tests.JmsTestLogger;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-/**
- *
- * A Receiver that receives messages from a destination in a JMS transaction
- *
- * Receives messages then commits, then
- * Receives messages then rollsback until
- * a total of messages have been received (committed)
- * must be a multiple of
- *
- * @author Tim Fox
- *
- */
-public class TransactionalReceiver extends Receiver
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected int commitSize;
-
- protected int rollbackSize;
-
- class Count
- {
- int lastCommitted;
-
- int lastReceived;
- }
-
- public TransactionalReceiver(final Session sess,
- final MessageConsumer cons,
- final int numMessages,
- final int commitSize,
- final int rollbackSize,
- final boolean isListener) throws Exception
- {
- super(sess, cons, numMessages, isListener);
- this.commitSize = commitSize;
- this.rollbackSize = rollbackSize;
- }
-
- @Override
- public void run()
- {
- // Small pause so as not to miss any messages in a topic
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- }
-
- try
- {
- int iterations = numMessages / commitSize;
-
- for (int outerCount = 0; outerCount < iterations; outerCount++)
- {
- for (int innerCount = 0; innerCount < commitSize; innerCount++)
- {
- Message m = getMessage();
-
- if (m == null)
- {
- TransactionalReceiver.log.error("Message is null");
- setFailed(true);
- return;
- }
- String prodName = m.getStringProperty("PROD_NAME");
- Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- Count count = (Count)counts.get(prodName);
- if (count == null)
- {
- // First time
- if (msgCount.intValue() != 0)
- {
- TransactionalReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
- setFailed(true);
- return;
- }
- else
- {
- count = new Count();
- counts.put(prodName, count);
- }
- }
- else
- {
- if (count.lastCommitted != msgCount.intValue() - 1)
- {
- TransactionalReceiver.log.error("Message out of sequence for " + m.getJMSMessageID() +
- " " +
- prodName +
- ", expected " +
- (count.lastCommitted + 1) +
- ", actual " +
- msgCount);
- setFailed(true);
- return;
- }
- }
- count.lastCommitted = msgCount.intValue();
-
- count.lastReceived = msgCount.intValue();
-
- if (innerCount == commitSize - 1)
- {
- sess.commit();
- }
-
- processingDone();
- }
-
- if (outerCount == iterations - 1)
- {
- break;
- }
-
- for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
- {
- Message m = getMessage();
-
- if (m == null)
- {
- TransactionalReceiver.log.error("Message is null");
- setFailed(true);
- return;
- }
- String prodName = m.getStringProperty("PROD_NAME");
- Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
-
- Count count = (Count)counts.get(prodName);
- if (count == null)
- {
- // First time
- if (msgCount.intValue() != 0)
- {
- TransactionalReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
- setFailed(true);
- return;
- }
- else
- {
- count = new Count();
- count.lastCommitted = -1;
- counts.put(prodName, count);
- }
- }
- else
- {
- if (count.lastReceived != msgCount.intValue() - 1)
- {
- TransactionalReceiver.log.error("Message out of sequence");
- setFailed(true);
- return;
- }
- }
- count.lastReceived = msgCount.intValue();
-
- if (innerCount == rollbackSize - 1 && outerCount != iterations - 1)
- {
- // Don't roll back on the very last one
- sess.rollback();
- }
- processingDone();
- }
- }
- finished();
- }
- catch (Exception e)
- {
- TransactionalReceiver.log.error("Failed to receive message", e);
- setFailed(true);
- }
- }
-}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TransactionalSender.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TransactionalSender.java
deleted file mode 100644
index 4c19344d59..0000000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/TransactionalSender.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.jms.tests.stress;
-
-import org.apache.activemq.jms.tests.JmsTestLogger;
-
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-/**
- *
- * A Sender that sends messages to a destination in a JMS transaction.
- *
- * Sends messages to a destination in a jms transaction.
- * Sends messages then commits, then
- * sends messages then rollsback until
- * a total of messages have been sent (commitSize)
- * must be a multiple of
- *
- * @author Tim Fox
- *
- */
-public class TransactionalSender extends Sender
-{
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- protected int commitSize;
-
- protected int rollbackSize;
-
- public TransactionalSender(final String prodName,
- final Session sess,
- final MessageProducer prod,
- final int numMessages,
- final int commitSize,
- final int rollbackSize)
- {
- super(prodName, sess, prod, numMessages);
-
- this.commitSize = commitSize;
- this.rollbackSize = rollbackSize;
-
- }
-
- @Override
- public void run()
- {
- int iterations = numMessages / commitSize;
-
- try
- {
- for (int outerCount = 0; outerCount < iterations; outerCount++)
- {
- for (int innerCount = 0; innerCount < commitSize; innerCount++)
- {
- Message m = sess.createMessage();
- m.setStringProperty("PROD_NAME", prodName);
- m.setIntProperty("MSG_NUMBER", outerCount * commitSize + innerCount);
- prod.send(m);
- }
- sess.commit();
- for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
- {
- Message m = sess.createMessage();
- m.setStringProperty("PROD_NAME", prodName);
- m.setIntProperty("MSG_NUMBER", (outerCount + 1) * commitSize + innerCount);
- prod.send(m);
- }
- sess.rollback();
- }
- }
- catch (Exception e)
- {
- TransactionalSender.log.error("Failed to send message", e);
- setFailed(true);
- }
- }
-}
diff --git a/tests/joram-tests/pom.xml b/tests/joram-tests/pom.xml
index fac51b4448..81c0fd330a 100644
--- a/tests/joram-tests/pom.xml
+++ b/tests/joram-tests/pom.xml
@@ -21,7 +21,7 @@
org.apache.activemq.tests
activemq-tests-pom
- 6.0.0
+ 6.0.1-SNAPSHOT
joram-tests
diff --git a/tests/performance-tests/pom.xml b/tests/performance-tests/pom.xml
index 4457b83202..3e6dc27402 100644
--- a/tests/performance-tests/pom.xml
+++ b/tests/performance-tests/pom.xml
@@ -21,7 +21,7 @@
org.apache.activemq.tests
activemq-tests-pom
- 6.0.0
+ 6.0.1-SNAPSHOT
performance-tests
diff --git a/tests/pom.xml b/tests/pom.xml
index 65c02c27d7..f2db04d6c7 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -19,7 +19,7 @@
org.apache.activemq
activemq-pom
- 6.0.0
+ 6.0.1-SNAPSHOT
ActiveMQ6 Tests POM
@@ -77,15 +77,9 @@
- banned-tests
+ extra-tests
- jms-tests
- integration-tests
- byteman-tests
- soak-tests
- stress-tests
- concurrent-tests
- performance-tests
+ extra-tests
@@ -94,5 +88,10 @@
unit-tests
joram-tests
timing-tests
+ jms-tests
+ integration-tests
+ soak-tests
+ stress-tests
+ performance-tests